diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java index d60e9b35e9..212cf1ced7 100644 --- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java +++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java @@ -28,6 +28,9 @@ import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.onlab.util.ItemNotFoundException; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; import org.onosproject.net.DeviceId; import org.onosproject.net.config.ConfigFactory; import org.onosproject.net.config.NetworkConfigEvent; @@ -78,6 +81,9 @@ public class PiPipeconfManager implements PiPipeconfService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected NetworkConfigRegistry cfgService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected LeadershipService leadershipService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DriverService driverService; @@ -87,6 +93,9 @@ public class PiPipeconfManager implements PiPipeconfService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected PiPipeconfMappingStore pipeconfMappingStore; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + // Registered pipeconf are replicated through the app subsystem and registered on app activated events. protected ConcurrentHashMap piPipeconfs = new ConcurrentHashMap<>(); @@ -200,21 +209,28 @@ public class PiPipeconfManager implements PiPipeconfService { // due to 1:1:1 pipeconf:driver:provider maybe find better way DriverProvider provider = new PiPipeconfDriverProviderInternal(completeDriver); - //we register to the dirver susbystem the driver provider containing the merged driver + //we register to the driver susbystem the driver provider containing the merged driver driverAdminService.registerProvider(provider); } // Changing the configuration for the device to enforce the full driver with pipipeconf - // and base behaviours - ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node(); - newCfg = newCfg.put(DRIVER, completeDriverName); - ObjectMapper mapper = new ObjectMapper(); - JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class); - cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode); - // Completable future is needed for when this method will also apply the pipeline to the device. - // FIXME (maybe): the pipeline is currently applied by the general device provider. But we store here - // the association between device and pipeconf. - pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId); + // and base behaviours, updating binding only first time something changes + NodeId leaderNodeId = leadershipService.getLeader("deploy-" + + deviceId.toString() + "-pipeconf"); + NodeId localNodeId = clusterService.getLocalNode().id(); + + if (!basicDeviceConfig.driver().equals(completeDriverName) && localNodeId.equals(leaderNodeId)) { + ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node(); + newCfg = newCfg.put(DRIVER, completeDriverName); + ObjectMapper mapper = new ObjectMapper(); + JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class); + log.debug("New driver {} for device {}", completeDriverName, deviceId); + cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode); + // Completable future is needed for when this method will also apply the pipeline to the device. + // FIXME (maybe): the pipeline is currently applied by the general device provider. + // But we store here the association between device and pipeconf. + pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId); + } operationResult.complete(true); } }); diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java index 7e41a9988c..f49ad1853f 100644 --- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java +++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java @@ -66,7 +66,7 @@ public class P4RuntimeHandshaker extends AbstractP4RuntimeHandlerBehaviour imple client = controller.getClient(deviceId); if (client == null || !controller.isReacheable(deviceId)) { - result.complete(MastershipRole.STANDBY); + result.complete(MastershipRole.NONE); return result; } if (newRole.equals(MastershipRole.MASTER)) { diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java index 91b9db0cc6..269936a5b9 100644 --- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java +++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java @@ -168,6 +168,8 @@ public class GrpcControllerImpl implements GrpcController { doDummyMessage(channels.get(channelId)); return true; } catch (IOException e) { + log.warn("Error in sending dummy message to device {}", channelId); + log.debug("Exception ", e); return false; } } finally { diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java index a77dc336e2..824ec79660 100644 --- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java +++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java @@ -30,7 +30,11 @@ import org.onlab.packet.ChassisId; import org.onlab.util.ItemNotFoundException; import org.onlab.util.Tools; import org.onosproject.cfg.ComponentConfigService; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; import org.onosproject.core.CoreService; +import org.onosproject.mastership.MastershipService; import org.onosproject.net.AnnotationKeys; import org.onosproject.net.DefaultAnnotations; import org.onosproject.net.Device; @@ -110,6 +114,10 @@ import static org.slf4j.LoggerFactory.getLogger; public class GeneralDeviceProvider extends AbstractProvider implements DeviceProvider { public static final String DRIVER = "driver"; + public static final int REACHABILITY_TIMEOUT = 10; + public static final String DEPLOY = "deploy-"; + public static final String PIPECONF_TOPIC = "-pipeconf"; + private final Logger log = getLogger(getClass()); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @@ -130,9 +138,18 @@ public class GeneralDeviceProvider extends AbstractProvider @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DriverService driverService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected PiPipeconfService piPipeconfService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected LeadershipService leadershipService; + private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10; @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS, label = "Configure poll frequency for port status and statistics; " + @@ -257,17 +274,26 @@ public class GeneralDeviceProvider extends AbstractProvider @Override public void roleChanged(DeviceId deviceId, MastershipRole newRole) { - log.debug("Received role {} for device {}", newRole, deviceId); + log.info("Received role {} for device {}", newRole, deviceId); CompletableFuture roleReply = getHandshaker(deviceId).roleChanged(newRole); - roleReply.thenAcceptAsync(mastership -> providerService.receivedRoleReply(deviceId, newRole, mastership)); + roleReply.thenAcceptAsync(mastership -> { + providerService.receivedRoleReply(deviceId, newRole, mastership); + if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) { + scheduledTasks.get(deviceId).cancel(false); + scheduledTasks.remove(deviceId); + } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) { + scheduledTasks.put(deviceId, schedulePolling(deviceId, false)); + updatePortStatistics(deviceId); + } + }); } @Override public boolean isReachable(DeviceId deviceId) { - log.debug("Testing rechability for device {}", deviceId); + log.debug("Testing reachability for device {}", deviceId); CompletableFuture reachable = getHandshaker(deviceId).isReachable(); try { - return reachable.get(10, TimeUnit.SECONDS); + return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error("Device {} is not reachable", deviceId, e); return false; @@ -368,15 +394,7 @@ public class GeneralDeviceProvider extends AbstractProvider return; } - //Storing deviceKeyId and all other config values - // as data in the driver with protocol_ - // name as the key. e.g protocol_ip - providerConfig.protocolsInfo() - .forEach((protocol, deviceInfoConfig) -> { - deviceInfoConfig.configValues() - .forEach((k, v) -> driverData.set(protocol + "_" + k, v)); - driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId()); - }); + addConfigData(providerConfig, driverData); //Connecting to the device CompletableFuture connected = handshaker.connect(); @@ -409,7 +427,7 @@ public class GeneralDeviceProvider extends AbstractProvider ports = deviceDiscovery.discoverPortDetails(); } - if (!handlePipeconf(deviceId, driver, driverData)) { + if (!handlePipeconf(deviceId, driver, driverData, true)) { // Something went wrong during handling of pipeconf. // We already logged the error. handshaker.disconnect(); @@ -425,11 +443,37 @@ public class GeneralDeviceProvider extends AbstractProvider } } + private void connectStandbyDevice(DeviceId deviceId) { + + //if device is pipeline programmable we merge pipeconf + base driver for every other role + GeneralProviderDeviceConfig providerConfig = + cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class); + + Driver driver = getDriver(deviceId); + + + DriverData driverData = new DefaultDriverData(driver, deviceId); + DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData); + if (handshaker == null) { + log.error("Device {}, with driver {} does not support DeviceHandshaker " + + "behaviour, supported behaviours={}", deviceId, driver.name(), driver.behaviours()); + return; + } + addConfigData(providerConfig, driverData); + + //Connecting to the device + handshaker.connect().thenAcceptAsync(result -> { + if (result) { + handlePipeconf(deviceId, driver, driverData, false); + } + }); + } + /** * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the * device can be registered to the core, false otherwise. */ - private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) { + private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) { PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class, driverData); @@ -439,6 +483,42 @@ public class GeneralDeviceProvider extends AbstractProvider return true; } + PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg); + + if (pipeconf != null) { + + PiPipeconfId pipeconfId = pipeconf.id(); + + try { + if (deployPipeconf) { + if (!pipelineProg.deployPipeconf(pipeconf).get()) { + log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", + pipeconfId, deviceId); + return false; + } + } + } catch (InterruptedException | ExecutionException e) { + log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e); + return false; + } + try { + if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) { + log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery", + driver.name(), deviceId, pipeconfId); + return false; + } + } catch (InterruptedException | ExecutionException e) { + log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e); + return false; + } + } else { + return false; + } + + return true; + } + + private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) { PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> { // No pipeconf has been associated with this device. // Check if device driver provides a default one. @@ -453,33 +533,16 @@ public class GeneralDeviceProvider extends AbstractProvider if (pipeconfId == null) { log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId); - return false; + return null; } if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) { log.warn("Pipeconf {} is not registered", pipeconfId); - return false; + return null; } - PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).get(); - - try { - if (!pipelineProg.deployPipeconf(pipeconf).get()) { - log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId); - return false; - } - - if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) { - log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery", - driver.name(), deviceId, pipeconfId); - return false; - } - } catch (InterruptedException | ExecutionException e) { - throw new IllegalStateException(e); - } - - return true; + return piPipeconfService.getPipeconf(pipeconfId).get(); } private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List ports) { @@ -492,7 +555,6 @@ public class GeneralDeviceProvider extends AbstractProvider DeviceHandshaker handshaker = getHandshaker(deviceId); if (handshaker != null) { CompletableFuture disconnect = handshaker.disconnect(); - disconnect.thenAcceptAsync(result -> { if (result) { log.info("Disconnected device {}", deviceId); @@ -560,6 +622,28 @@ public class GeneralDeviceProvider extends AbstractProvider log.info("Device {} is already connected to ONOS and is available", deviceId); return; } + NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC) + .leader().nodeId(); + NodeId localNodeId = clusterService.getLocalNode().id(); + if (localNodeId.equals(leaderNodeId)) { + if (processEvent(event, deviceId)) { + log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId, + deviceId); + checkAndSubmitDeviceTask(deviceId); + } + } else { + if (processEvent(event, deviceId)) { + log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER", + localNodeId, deviceId, leaderNodeId); + connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId))); + //FIXME this will be removed when config is synced + cleanUpConfigInfo(deviceId); + } + } + + } + + private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) { //FIXME to be removed when netcfg will issue device events in a bundle or // ensure all configuration needed is present Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock()); @@ -590,7 +674,7 @@ public class GeneralDeviceProvider extends AbstractProvider // in the pipelineConfigured if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId) && pipelineConfigured.contains(deviceId)) { - checkAndSubmitDeviceTask(deviceId); + return true; } else { if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) { log.debug("Waiting for pipeline configuration for device {}", deviceId); @@ -604,6 +688,7 @@ public class GeneralDeviceProvider extends AbstractProvider log.debug("Only device configuration for device {}", deviceId); } } + return false; } finally { lock.unlock(); } @@ -622,10 +707,26 @@ public class GeneralDeviceProvider extends AbstractProvider private void checkAndSubmitDeviceTask(DeviceId deviceId) { connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId))); //FIXME this will be removed when configuration is synced. + cleanUpConfigInfo(deviceId); + + } + + private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) { + //Storing deviceKeyId and all other config values + // as data in the driver with protocol_ + // name as the key. e.g protocol_ip + providerConfig.protocolsInfo() + .forEach((protocol, deviceInfoConfig) -> { + deviceInfoConfig.configValues() + .forEach((k, v) -> driverData.set(protocol + "_" + k, v)); + driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId()); + }); + } + + private void cleanUpConfigInfo(DeviceId deviceId) { deviceConfigured.remove(deviceId); driverConfigured.remove(deviceId); pipelineConfigured.remove(deviceId); - } private ScheduledFuture schedulePolling(DeviceId deviceId, boolean randomize) { @@ -650,10 +751,13 @@ public class GeneralDeviceProvider extends AbstractProvider //For now this is scheduled periodically, when streaming API will // be available we check and base it on the streaming API (e.g. gNMI) - scheduledTasks.put(deviceId, schedulePolling(deviceId, false)); - updatePortStatistics(deviceId); + if (mastershipService.isLocalMaster(deviceId)) { + scheduledTasks.put(deviceId, schedulePolling(deviceId, false)); + updatePortStatistics(deviceId); + } } else if (type.equals(Type.DEVICE_REMOVED)) { + connectionExecutor.submit(exceptionSafe(() -> disconnectDevice(deviceId))); } diff --git a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java index fb4a97a895..8693effbe0 100644 --- a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java +++ b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java @@ -22,6 +22,7 @@ import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.onosproject.mastership.MastershipService; import org.onosproject.net.Device; import org.onosproject.net.DeviceId; import org.onosproject.net.device.DeviceService; @@ -67,6 +68,9 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DeviceService deviceService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; + private PacketProviderService providerService; private InternalPacketListener packetListener = new InternalPacketListener(); @@ -98,7 +102,7 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP if (packet != null) { DeviceId deviceId = packet.sendThrough(); Device device = deviceService.getDevice(deviceId); - if (device.is(PacketProgrammable.class)) { + if (device.is(PacketProgrammable.class) && mastershipService.isLocalMaster(deviceId)) { PacketProgrammable packetProgrammable = device.as(PacketProgrammable.class); packetProgrammable.emit(packet); } else { @@ -148,7 +152,10 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP @Override public void event(P4RuntimeEvent event) { - if (event.type() != P4RuntimeEvent.Type.PACKET_IN) { + //Masterhip message is sent to everybody but picked up only by master. + //FIXME we need the device ID into p4RuntimeEvnetSubject to check for mastsership + if (!(event.subject() instanceof P4RuntimePacketIn) || event.type() != P4RuntimeEvent.Type.PACKET_IN) { + log.debug("Event type {}", event.type()); // Not a packet-in event, ignore it. return; } @@ -163,7 +170,7 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP if (!device.is(PiPipelineInterpreter.class)) { log.warn("Unable to process packet-in from {}, device has no PiPipelineInterpreter behaviour", - deviceId); + deviceId); return; } @@ -184,7 +191,7 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP log.debug("Processing inbound packet: {}", inPkt.toString()); OutboundPacket outPkt = new DefaultOutboundPacket(eventSubject.deviceId(), null, - operation.data().asReadOnlyBuffer()); + operation.data().asReadOnlyBuffer()); PacketContext pktCtx = new P4RuntimePacketContext(System.currentTimeMillis(), inPkt, outPkt, false); // Pushing the packet context up for processing.