[ONOS-6810] Implement Mastership handling in general DeviceProvider

Change-Id: I14b706d364cf5124da248230fbcda65d0bd284ce
This commit is contained in:
Andrea Campanella 2017-07-24 18:11:36 +02:00
parent 6987e0e76c
commit 14e196dbcd
5 changed files with 185 additions and 56 deletions

View File

@ -28,6 +28,9 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.ItemNotFoundException;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
@ -78,6 +81,9 @@ public class PiPipeconfManager implements PiPipeconfService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigRegistry cfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverService driverService;
@ -87,6 +93,9 @@ public class PiPipeconfManager implements PiPipeconfService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PiPipeconfMappingStore pipeconfMappingStore;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
// Registered pipeconf are replicated through the app subsystem and registered on app activated events.
protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
@ -200,21 +209,28 @@ public class PiPipeconfManager implements PiPipeconfService {
// due to 1:1:1 pipeconf:driver:provider maybe find better way
DriverProvider provider = new PiPipeconfDriverProviderInternal(completeDriver);
//we register to the dirver susbystem the driver provider containing the merged driver
//we register to the driver susbystem the driver provider containing the merged driver
driverAdminService.registerProvider(provider);
}
// Changing the configuration for the device to enforce the full driver with pipipeconf
// and base behaviours
ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
newCfg = newCfg.put(DRIVER, completeDriverName);
ObjectMapper mapper = new ObjectMapper();
JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
// Completable future is needed for when this method will also apply the pipeline to the device.
// FIXME (maybe): the pipeline is currently applied by the general device provider. But we store here
// the association between device and pipeconf.
pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
// and base behaviours, updating binding only first time something changes
NodeId leaderNodeId = leadershipService.getLeader("deploy-" +
deviceId.toString() + "-pipeconf");
NodeId localNodeId = clusterService.getLocalNode().id();
if (!basicDeviceConfig.driver().equals(completeDriverName) && localNodeId.equals(leaderNodeId)) {
ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
newCfg = newCfg.put(DRIVER, completeDriverName);
ObjectMapper mapper = new ObjectMapper();
JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
log.debug("New driver {} for device {}", completeDriverName, deviceId);
cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
// Completable future is needed for when this method will also apply the pipeline to the device.
// FIXME (maybe): the pipeline is currently applied by the general device provider.
// But we store here the association between device and pipeconf.
pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
}
operationResult.complete(true);
}
});

View File

@ -66,7 +66,7 @@ public class P4RuntimeHandshaker extends AbstractP4RuntimeHandlerBehaviour imple
client = controller.getClient(deviceId);
if (client == null || !controller.isReacheable(deviceId)) {
result.complete(MastershipRole.STANDBY);
result.complete(MastershipRole.NONE);
return result;
}
if (newRole.equals(MastershipRole.MASTER)) {

View File

@ -168,6 +168,8 @@ public class GrpcControllerImpl implements GrpcController {
doDummyMessage(channels.get(channelId));
return true;
} catch (IOException e) {
log.warn("Error in sending dummy message to device {}", channelId);
log.debug("Exception ", e);
return false;
}
} finally {

View File

@ -30,7 +30,11 @@ import org.onlab.packet.ChassisId;
import org.onlab.util.ItemNotFoundException;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
@ -110,6 +114,10 @@ import static org.slf4j.LoggerFactory.getLogger;
public class GeneralDeviceProvider extends AbstractProvider
implements DeviceProvider {
public static final String DRIVER = "driver";
public static final int REACHABILITY_TIMEOUT = 10;
public static final String DEPLOY = "deploy-";
public static final String PIPECONF_TOPIC = "-pipeconf";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@ -130,9 +138,18 @@ public class GeneralDeviceProvider extends AbstractProvider
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PiPipeconfService piPipeconfService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
@Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
label = "Configure poll frequency for port status and statistics; " +
@ -257,17 +274,26 @@ public class GeneralDeviceProvider extends AbstractProvider
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
log.debug("Received role {} for device {}", newRole, deviceId);
log.info("Received role {} for device {}", newRole, deviceId);
CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
roleReply.thenAcceptAsync(mastership -> providerService.receivedRoleReply(deviceId, newRole, mastership));
roleReply.thenAcceptAsync(mastership -> {
providerService.receivedRoleReply(deviceId, newRole, mastership);
if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
scheduledTasks.get(deviceId).cancel(false);
scheduledTasks.remove(deviceId);
} else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
updatePortStatistics(deviceId);
}
});
}
@Override
public boolean isReachable(DeviceId deviceId) {
log.debug("Testing rechability for device {}", deviceId);
log.debug("Testing reachability for device {}", deviceId);
CompletableFuture<Boolean> reachable = getHandshaker(deviceId).isReachable();
try {
return reachable.get(10, TimeUnit.SECONDS);
return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Device {} is not reachable", deviceId, e);
return false;
@ -368,15 +394,7 @@ public class GeneralDeviceProvider extends AbstractProvider
return;
}
//Storing deviceKeyId and all other config values
// as data in the driver with protocol_<info>
// name as the key. e.g protocol_ip
providerConfig.protocolsInfo()
.forEach((protocol, deviceInfoConfig) -> {
deviceInfoConfig.configValues()
.forEach((k, v) -> driverData.set(protocol + "_" + k, v));
driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
});
addConfigData(providerConfig, driverData);
//Connecting to the device
CompletableFuture<Boolean> connected = handshaker.connect();
@ -409,7 +427,7 @@ public class GeneralDeviceProvider extends AbstractProvider
ports = deviceDiscovery.discoverPortDetails();
}
if (!handlePipeconf(deviceId, driver, driverData)) {
if (!handlePipeconf(deviceId, driver, driverData, true)) {
// Something went wrong during handling of pipeconf.
// We already logged the error.
handshaker.disconnect();
@ -425,11 +443,37 @@ public class GeneralDeviceProvider extends AbstractProvider
}
}
private void connectStandbyDevice(DeviceId deviceId) {
//if device is pipeline programmable we merge pipeconf + base driver for every other role
GeneralProviderDeviceConfig providerConfig =
cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
Driver driver = getDriver(deviceId);
DriverData driverData = new DefaultDriverData(driver, deviceId);
DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
if (handshaker == null) {
log.error("Device {}, with driver {} does not support DeviceHandshaker " +
"behaviour, supported behaviours={}", deviceId, driver.name(), driver.behaviours());
return;
}
addConfigData(providerConfig, driverData);
//Connecting to the device
handshaker.connect().thenAcceptAsync(result -> {
if (result) {
handlePipeconf(deviceId, driver, driverData, false);
}
});
}
/**
* Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
* device can be registered to the core, false otherwise.
*/
private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
driverData);
@ -439,6 +483,42 @@ public class GeneralDeviceProvider extends AbstractProvider
return true;
}
PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
if (pipeconf != null) {
PiPipeconfId pipeconfId = pipeconf.id();
try {
if (deployPipeconf) {
if (!pipelineProg.deployPipeconf(pipeconf).get()) {
log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
pipeconfId, deviceId);
return false;
}
}
} catch (InterruptedException | ExecutionException e) {
log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e);
return false;
}
try {
if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
driver.name(), deviceId, pipeconfId);
return false;
}
} catch (InterruptedException | ExecutionException e) {
log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e);
return false;
}
} else {
return false;
}
return true;
}
private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
// No pipeconf has been associated with this device.
// Check if device driver provides a default one.
@ -453,33 +533,16 @@ public class GeneralDeviceProvider extends AbstractProvider
if (pipeconfId == null) {
log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
return false;
return null;
}
if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
log.warn("Pipeconf {} is not registered", pipeconfId);
return false;
return null;
}
PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
try {
if (!pipelineProg.deployPipeconf(pipeconf).get()) {
log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId);
return false;
}
if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
driver.name(), deviceId, pipeconfId);
return false;
}
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
return true;
return piPipeconfService.getPipeconf(pipeconfId).get();
}
private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
@ -492,7 +555,6 @@ public class GeneralDeviceProvider extends AbstractProvider
DeviceHandshaker handshaker = getHandshaker(deviceId);
if (handshaker != null) {
CompletableFuture<Boolean> disconnect = handshaker.disconnect();
disconnect.thenAcceptAsync(result -> {
if (result) {
log.info("Disconnected device {}", deviceId);
@ -560,6 +622,28 @@ public class GeneralDeviceProvider extends AbstractProvider
log.info("Device {} is already connected to ONOS and is available", deviceId);
return;
}
NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC)
.leader().nodeId();
NodeId localNodeId = clusterService.getLocalNode().id();
if (localNodeId.equals(leaderNodeId)) {
if (processEvent(event, deviceId)) {
log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId,
deviceId);
checkAndSubmitDeviceTask(deviceId);
}
} else {
if (processEvent(event, deviceId)) {
log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER",
localNodeId, deviceId, leaderNodeId);
connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
//FIXME this will be removed when config is synced
cleanUpConfigInfo(deviceId);
}
}
}
private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
//FIXME to be removed when netcfg will issue device events in a bundle or
// ensure all configuration needed is present
Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
@ -590,7 +674,7 @@ public class GeneralDeviceProvider extends AbstractProvider
// in the pipelineConfigured
if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
&& pipelineConfigured.contains(deviceId)) {
checkAndSubmitDeviceTask(deviceId);
return true;
} else {
if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
log.debug("Waiting for pipeline configuration for device {}", deviceId);
@ -604,6 +688,7 @@ public class GeneralDeviceProvider extends AbstractProvider
log.debug("Only device configuration for device {}", deviceId);
}
}
return false;
} finally {
lock.unlock();
}
@ -622,10 +707,26 @@ public class GeneralDeviceProvider extends AbstractProvider
private void checkAndSubmitDeviceTask(DeviceId deviceId) {
connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
//FIXME this will be removed when configuration is synced.
cleanUpConfigInfo(deviceId);
}
private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
//Storing deviceKeyId and all other config values
// as data in the driver with protocol_<info>
// name as the key. e.g protocol_ip
providerConfig.protocolsInfo()
.forEach((protocol, deviceInfoConfig) -> {
deviceInfoConfig.configValues()
.forEach((k, v) -> driverData.set(protocol + "_" + k, v));
driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
});
}
private void cleanUpConfigInfo(DeviceId deviceId) {
deviceConfigured.remove(deviceId);
driverConfigured.remove(deviceId);
pipelineConfigured.remove(deviceId);
}
private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
@ -650,10 +751,13 @@ public class GeneralDeviceProvider extends AbstractProvider
//For now this is scheduled periodically, when streaming API will
// be available we check and base it on the streaming API (e.g. gNMI)
scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
updatePortStatistics(deviceId);
if (mastershipService.isLocalMaster(deviceId)) {
scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
updatePortStatistics(deviceId);
}
} else if (type.equals(Type.DEVICE_REMOVED)) {
connectionExecutor.submit(exceptionSafe(() ->
disconnectDevice(deviceId)));
}

View File

@ -22,6 +22,7 @@ import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
@ -67,6 +68,9 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
private PacketProviderService providerService;
private InternalPacketListener packetListener = new InternalPacketListener();
@ -98,7 +102,7 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP
if (packet != null) {
DeviceId deviceId = packet.sendThrough();
Device device = deviceService.getDevice(deviceId);
if (device.is(PacketProgrammable.class)) {
if (device.is(PacketProgrammable.class) && mastershipService.isLocalMaster(deviceId)) {
PacketProgrammable packetProgrammable = device.as(PacketProgrammable.class);
packetProgrammable.emit(packet);
} else {
@ -148,7 +152,10 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP
@Override
public void event(P4RuntimeEvent event) {
if (event.type() != P4RuntimeEvent.Type.PACKET_IN) {
//Masterhip message is sent to everybody but picked up only by master.
//FIXME we need the device ID into p4RuntimeEvnetSubject to check for mastsership
if (!(event.subject() instanceof P4RuntimePacketIn) || event.type() != P4RuntimeEvent.Type.PACKET_IN) {
log.debug("Event type {}", event.type());
// Not a packet-in event, ignore it.
return;
}
@ -163,7 +170,7 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP
if (!device.is(PiPipelineInterpreter.class)) {
log.warn("Unable to process packet-in from {}, device has no PiPipelineInterpreter behaviour",
deviceId);
deviceId);
return;
}
@ -184,7 +191,7 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP
log.debug("Processing inbound packet: {}", inPkt.toString());
OutboundPacket outPkt = new DefaultOutboundPacket(eventSubject.deviceId(), null,
operation.data().asReadOnlyBuffer());
operation.data().asReadOnlyBuffer());
PacketContext pktCtx = new P4RuntimePacketContext(System.currentTimeMillis(), inPkt, outPkt, false);
// Pushing the packet context up for processing.