diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java index 127230585e..911c5ed81d 100644 --- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java +++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java @@ -73,19 +73,6 @@ public abstract class AbstractP4RuntimePipelineProgrammable extends AbstractHand } try { - if (!client.setPipelineConfig(pipeconf, deviceDataBuffer).get()) { - log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId); - return false; - } - } catch (InterruptedException | ExecutionException e) { - log.error("Exception while deploying pipeconf to {}", deviceId, e); - return false; - } - - try { - // It would make more sense to init the stream channel once the client - // is created, but P4runtime would reject any command if a P4info has - // not been set first. if (!client.initStreamChannel().get()) { log.warn("Unable to init stream channel to {}.", deviceId); return false; @@ -95,6 +82,15 @@ public abstract class AbstractP4RuntimePipelineProgrammable extends AbstractHand return false; } + try { + if (!client.setPipelineConfig(pipeconf, deviceDataBuffer).get()) { + log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId); + return false; + } + } catch (InterruptedException | ExecutionException e) { + log.error("Exception while deploying pipeconf to {}", deviceId, e); + return false; + } return true; } diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java index 841678697a..7e41a9988c 100644 --- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java +++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java @@ -60,10 +60,30 @@ public class P4RuntimeHandshaker extends AbstractP4RuntimeHandlerBehaviour imple @Override public CompletableFuture roleChanged(MastershipRole newRole) { + deviceId = handler().data().deviceId(); + controller = handler().get(P4RuntimeController.class); CompletableFuture result = new CompletableFuture<>(); - log.warn("roleChanged not implemented"); - result.complete(MastershipRole.MASTER); - // TODO. + + client = controller.getClient(deviceId); + if (client == null || !controller.isReacheable(deviceId)) { + result.complete(MastershipRole.STANDBY); + return result; + } + if (newRole.equals(MastershipRole.MASTER)) { + client.sendMasterArbitrationUpdate().thenAcceptAsync(success -> { + if (!success) { + log.warn("Device {} arbitration failed", deviceId); + result.complete(MastershipRole.STANDBY); + } else { + result.complete(MastershipRole.MASTER); + } + }); + } else { + // Since we don't need to do anything, we can complete it directly + // Spec: The client with the highest election id is referred to as the + // "master", while all other clients are referred to as "slaves". + result.complete(newRole); + } return result; } } diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java index 7c75c91252..5072c65613 100644 --- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java +++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java @@ -156,5 +156,12 @@ public interface P4RuntimeClient { */ void shutdown(); + /** + * Sends a master arbitration update to the device. + * + * @return a completable future containing true if the operation was successful; false otherwise + */ + CompletableFuture sendMasterArbitrationUpdate(); + // TODO: work in progress. } diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java index b095bcaa2c..3e015a4e46 100644 --- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java +++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java @@ -75,4 +75,11 @@ public interface P4RuntimeController extends ListenerService streamRequestObserver; + private Map> arbitrationUpdateMap = Maps.newConcurrentMap(); + protected Uint128 p4RuntimeElectionId; + /** * Default constructor. * @@ -257,34 +264,44 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient { "dumpGroups-" + actionProfileId.id()); } + @Override + public CompletableFuture sendMasterArbitrationUpdate() { + return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate"); + } + /* Blocking method implementations below */ + private boolean doArbitrationUpdate() { + CompletableFuture result = new CompletableFuture<>(); + // TODO: currently we use 64-bit Long type for election id, should + // we use 128-bit ? + long nextElectId = controller.getNewMasterElectionId(); + Uint128 newElectionId = Uint128.newBuilder() + .setLow(nextElectId) + .build(); + MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder() + .setDeviceId(p4DeviceId) + .setElectionId(newElectionId) + .build(); + StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder() + .setArbitration(arbitrationUpdate) + .build(); + log.debug("Sending arbitration update to {} with election id {}...", + deviceId, newElectionId); + arbitrationUpdateMap.put(newElectionId, result); + try { + streamRequestObserver.onNext(requestMsg); + return result.get(); + } catch (InterruptedException | ExecutionException | StatusRuntimeException e) { + log.warn("Arbitration update failed for {} due to {}", deviceId, e); + arbitrationUpdateMap.remove(newElectionId); + return false; + } + } private boolean doInitStreamChannel() { // To listen for packets and other events, we need to start the RPC. // Here we do it by sending a master arbitration update. - log.info("initializing stream chanel on {}...", deviceId); - if (!doArbitrationUpdate()) { - log.warn("Unable to initialize stream channel for {}", deviceId); - return false; - } else { - return true; - } - } - - private boolean doArbitrationUpdate() { - log.info("Sending arbitration update to {}...", deviceId); - StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder() - .setArbitration(MasterArbitrationUpdate.newBuilder() - .setDeviceId(p4DeviceId) - .build()) - .build(); - try { - streamRequestObserver.onNext(requestMsg); - return true; - } catch (StatusRuntimeException e) { - log.warn("Arbitration update failed for {}: {}", deviceId, e); - return false; - } + return doArbitrationUpdate(); } private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) { @@ -315,6 +332,7 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient { SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest .newBuilder() + .setElectionId(p4RuntimeElectionId) .setAction(VERIFY_AND_COMMIT) .addConfigs(pipelineConfig) .build(); @@ -330,7 +348,6 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient { private boolean doWriteTableEntries(Collection piTableEntries, WriteOperationType opType, PiPipeconf pipeconf) { - WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder(); Collection updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf) @@ -350,11 +367,7 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient { writeRequestBuilder .setDeviceId(p4DeviceId) - /* PI ignores this ElectionId, commenting out for now. - .setElectionId(Uint128.newBuilder() - .setHigh(0) - .setLow(ELECTION_ID) - .build()) */ + .setElectionId(p4RuntimeElectionId) .addAllUpdates(updateMsgs) .build(); @@ -455,8 +468,33 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient { } private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) { + log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg); - log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg); + Uint128 electionId = arbitrationMsg.getElectionId(); + CompletableFuture mastershipFeature = arbitrationUpdateMap.remove(electionId); + + if (mastershipFeature == null) { + log.warn("Can't find completable future of election id {}", electionId); + return; + } + + this.p4RuntimeElectionId = electionId; + int statusCode = arbitrationMsg.getStatus().getCode(); + MastershipRole arbitrationRole; + // arbitration update success + + if (statusCode == Status.OK.getCode().value()) { + mastershipFeature.complete(true); + arbitrationRole = MastershipRole.MASTER; + } else { + mastershipFeature.complete(false); + arbitrationRole = MastershipRole.STANDBY; + } + + DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId); + P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION, + arbitrationEventSubject); + controller.postEvent(event); } private Collection doReadCounterCells(Collection cellIds, PiPipeconf pipeconf) { @@ -490,7 +528,6 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient { } private boolean doWriteActionGroupMembers(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) { - final Collection actionProfileMembers = Lists.newArrayList(); try { for (PiActionGroupMember member : group.members()) { @@ -518,6 +555,7 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient { WriteRequest writeRequestMsg = WriteRequest.newBuilder() .setDeviceId(p4DeviceId) + .setElectionId(p4RuntimeElectionId) .addAllUpdates(updateMsgs) .build(); try { @@ -652,7 +690,6 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient { } private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) { - final ActionProfileGroup actionProfileGroup; try { actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf); @@ -663,6 +700,7 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient { final WriteRequest writeRequestMsg = WriteRequest.newBuilder() .setDeviceId(p4DeviceId) + .setElectionId(p4RuntimeElectionId) .addUpdates(Update.newBuilder() .setEntity(Entity.newBuilder() .setActionProfileGroup(actionProfileGroup) diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java index 383b857b92..57703a1b96 100644 --- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java +++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java @@ -35,6 +35,8 @@ import org.onosproject.p4runtime.api.P4RuntimeClient; import org.onosproject.p4runtime.api.P4RuntimeController; import org.onosproject.p4runtime.api.P4RuntimeEvent; import org.onosproject.p4runtime.api.P4RuntimeEventListener; +import org.onosproject.store.service.AtomicCounter; +import org.onosproject.store.service.StorageService; import org.slf4j.Logger; import java.io.IOException; @@ -55,19 +57,26 @@ public class P4RuntimeControllerImpl extends AbstractListenerManager implements P4RuntimeController { + private static final String P4R_ELECTION = "p4runtime-election"; private final Logger log = getLogger(getClass()); private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider(); private final Map clients = Maps.newHashMap(); private final Map channelIds = Maps.newHashMap(); // TODO: should use a cache to delete unused locks. private final Map deviceLocks = Maps.newConcurrentMap(); + private AtomicCounter electionIdGenerator; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) public GrpcController grpcController; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + public StorageService storageService; + @Activate public void activate() { eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry); + electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION); + log.info("Started"); } @@ -186,6 +195,11 @@ public class P4RuntimeControllerImpl } } + @Override + public long getNewMasterElectionId() { + return electionIdGenerator.incrementAndGet(); + } + public void postEvent(P4RuntimeEvent event) { post(event); } diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java index fda505393b..debf6a6320 100644 --- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java +++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java @@ -48,6 +48,7 @@ import org.onosproject.net.pi.runtime.PiActionProfileId; import p4.P4RuntimeOuterClass.ActionProfileGroup; import p4.P4RuntimeOuterClass.ActionProfileMember; import p4.P4RuntimeOuterClass.Entity; +import p4.P4RuntimeOuterClass.Uint128; import p4.P4RuntimeOuterClass.Update; import p4.P4RuntimeOuterClass.WriteRequest; @@ -101,6 +102,7 @@ public class P4RuntimeGroupTest { private static final int SET_EGRESS_PORT_ID = 16794308; private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest"; private static final long DEFAULT_TIMEOUT_TIME = 10; + private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build(); private P4RuntimeClientImpl client; private P4RuntimeControllerImpl controller; @@ -156,6 +158,7 @@ public class P4RuntimeGroupTest { client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID, grpcChannel, controller); + client.p4RuntimeElectionId = DEFAULT_ELECTION_ID; } @Test @@ -166,6 +169,7 @@ public class P4RuntimeGroupTest { WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0); assertEquals(1, result.getDeviceId()); assertEquals(1, result.getUpdatesCount()); + assertEquals(DEFAULT_ELECTION_ID, result.getElectionId()); Update update = result.getUpdatesList().get(0); assertEquals(Update.Type.INSERT, update.getType()); @@ -194,6 +198,7 @@ public class P4RuntimeGroupTest { WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0); assertEquals(1, result.getDeviceId()); assertEquals(3, result.getUpdatesCount()); + assertEquals(DEFAULT_ELECTION_ID, result.getElectionId()); List updates = result.getUpdatesList(); for (Update update : updates) { diff --git a/protocols/p4runtime/proto/BUCK b/protocols/p4runtime/proto/BUCK index a50c71fbc3..f8623f867d 100644 --- a/protocols/p4runtime/proto/BUCK +++ b/protocols/p4runtime/proto/BUCK @@ -5,7 +5,7 @@ include_defs( PROTOBUF_VER = '3.0.2' GRPC_VER = '1.3.0' -PI_COMMIT = '9fc50cd0a0187eb1346272524d4b8bafb51bb513' +PI_COMMIT = 'a8814a8ac40838a9df83fe47a17a025b69026fcf' PI_BASEURL = 'https://github.com/p4lang/PI.git' # Wondering which .proto files to build? Check p4runtime's Makefile: diff --git a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java index f352d45e7f..19a42b39c1 100644 --- a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java +++ b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java @@ -148,6 +148,10 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP @Override public void event(P4RuntimeEvent event) { + if (event.type() != P4RuntimeEvent.Type.PACKET_IN) { + // Not a packet-in event, ignore it. + return; + } P4RuntimePacketIn eventSubject = (P4RuntimePacketIn) event.subject(); DeviceId deviceId = eventSubject.deviceId(); diff --git a/tools/dev/bin/onos-setup-p4-dev b/tools/dev/bin/onos-setup-p4-dev index 7f36d4749d..ac12c3dd0e 100755 --- a/tools/dev/bin/onos-setup-p4-dev +++ b/tools/dev/bin/onos-setup-p4-dev @@ -15,8 +15,8 @@ set -e BUILD_DIR=~/p4tools -BMV2_COMMIT="4eeb8dad8e8f062636f9d0d8296aa7f288c6f6dd" -PI_COMMIT="9fc50cd0a0187eb1346272524d4b8bafb51bb513" +BMV2_COMMIT="44ac9c21636b00fed660ae8590889d85b5d4df4c" +PI_COMMIT="a8814a8ac40838a9df83fe47a17a025b69026fcf" P4C_COMMIT="040b931fbfcb7912e3a14cd05df950fbdd49b038" PROTOBUF_COMMIT="tags/v3.0.2" GRPC_COMMIT="tags/v1.3.0"