[ONOS-7143] Add arbitration update support by P4RuntimeClient

Change-Id: I671275576018d50447f969166a7b42a28dd93b1d
This commit is contained in:
Yi Tseng 2017-10-20 10:31:53 -07:00 committed by Andrea Campanella
parent e56134132b
commit 3e7f145b90
12 changed files with 205 additions and 52 deletions

View File

@ -73,19 +73,6 @@ public abstract class AbstractP4RuntimePipelineProgrammable extends AbstractHand
}
try {
if (!client.setPipelineConfig(pipeconf, deviceDataBuffer).get()) {
log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
return false;
}
} catch (InterruptedException | ExecutionException e) {
log.error("Exception while deploying pipeconf to {}", deviceId, e);
return false;
}
try {
// It would make more sense to init the stream channel once the client
// is created, but P4runtime would reject any command if a P4info has
// not been set first.
if (!client.initStreamChannel().get()) {
log.warn("Unable to init stream channel to {}.", deviceId);
return false;
@ -95,6 +82,15 @@ public abstract class AbstractP4RuntimePipelineProgrammable extends AbstractHand
return false;
}
try {
if (!client.setPipelineConfig(pipeconf, deviceDataBuffer).get()) {
log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
return false;
}
} catch (InterruptedException | ExecutionException e) {
log.error("Exception while deploying pipeconf to {}", deviceId, e);
return false;
}
return true;
}

View File

@ -60,10 +60,30 @@ public class P4RuntimeHandshaker extends AbstractP4RuntimeHandlerBehaviour imple
@Override
public CompletableFuture<MastershipRole> roleChanged(MastershipRole newRole) {
deviceId = handler().data().deviceId();
controller = handler().get(P4RuntimeController.class);
CompletableFuture<MastershipRole> result = new CompletableFuture<>();
log.warn("roleChanged not implemented");
client = controller.getClient(deviceId);
if (client == null || !controller.isReacheable(deviceId)) {
result.complete(MastershipRole.STANDBY);
return result;
}
if (newRole.equals(MastershipRole.MASTER)) {
client.sendMasterArbitrationUpdate().thenAcceptAsync(success -> {
if (!success) {
log.warn("Device {} arbitration failed", deviceId);
result.complete(MastershipRole.STANDBY);
} else {
result.complete(MastershipRole.MASTER);
// TODO.
}
});
} else {
// Since we don't need to do anything, we can complete it directly
// Spec: The client with the highest election id is referred to as the
// "master", while all other clients are referred to as "slaves".
result.complete(newRole);
}
return result;
}
}

View File

@ -156,5 +156,12 @@ public interface P4RuntimeClient {
*/
void shutdown();
/**
* Sends a master arbitration update to the device.
*
* @return a completable future containing true if the operation was successful; false otherwise
*/
CompletableFuture<Boolean> sendMasterArbitrationUpdate();
// TODO: work in progress.
}

View File

@ -75,4 +75,11 @@ public interface P4RuntimeController extends ListenerService<P4RuntimeEvent, P4R
* @return true if a client was created and is able to contact the P4Runtime server, false otherwise.
*/
boolean isReacheable(DeviceId deviceId);
/**
* Gets new election id for device arbitration request.
*
* @return the election id
*/
long getNewMasterElectionId();
}

View File

@ -33,7 +33,11 @@ public final class P4RuntimeEvent extends AbstractEvent<P4RuntimeEvent.Type, P4R
* A packet-in.
*/
PACKET_IN,
// TODO: add mastership, device as soon as we define those.
/**
* Arbitration reply.
*/
ARBITRATION,
}
public P4RuntimeEvent(Type type, P4RuntimeEventSubject subject) {

View File

@ -0,0 +1,58 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.p4runtime.ctl;
import org.onosproject.net.MastershipRole;
import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
import p4.P4RuntimeOuterClass.Uint128;
/**
* Default implementation of arbitration in P4Runtime.
*/
public class DefaultArbitration implements P4RuntimeEventSubject {
private MastershipRole role;
private Uint128 electionId;
/**
* Creates arbitration with given role and election id.
*
* @param role the role
* @param electionId the election id
*/
public DefaultArbitration(MastershipRole role, Uint128 electionId) {
this.role = role;
this.electionId = electionId;
}
/**
* Gets the role of this arbitration.
*
* @return the role
*/
public MastershipRole role() {
return role;
}
/**
* Gets election id of this arbitration.
*
* @return the election id
*/
public Uint128 electionId() {
return electionId;
}
}

View File

@ -31,6 +31,7 @@ import io.grpc.stub.StreamObserver;
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.util.Tools;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.runtime.PiActionGroup;
import org.onosproject.net.pi.runtime.PiActionGroupMember;
@ -60,6 +61,7 @@ import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
import p4.P4RuntimeOuterClass.StreamMessageRequest;
import p4.P4RuntimeOuterClass.StreamMessageResponse;
import p4.P4RuntimeOuterClass.TableEntry;
import p4.P4RuntimeOuterClass.Uint128;
import p4.P4RuntimeOuterClass.Update;
import p4.P4RuntimeOuterClass.WriteRequest;
import p4.config.P4InfoOuterClass.P4Info;
@ -74,6 +76,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -104,6 +107,7 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
WriteOperationType.MODIFY, Update.Type.MODIFY,
WriteOperationType.DELETE, Update.Type.DELETE
);
private static final String ARBITRATION_RESULT_MASTER = "Is master";
private final Logger log = getLogger(getClass());
@ -117,6 +121,9 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
private final Lock writeLock = new ReentrantLock();
private final StreamObserver<StreamMessageRequest> streamRequestObserver;
private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
protected Uint128 p4RuntimeElectionId;
/**
* Default constructor.
*
@ -257,34 +264,44 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
"dumpGroups-" + actionProfileId.id());
}
@Override
public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
}
/* Blocking method implementations below */
private boolean doArbitrationUpdate() {
CompletableFuture<Boolean> result = new CompletableFuture<>();
// TODO: currently we use 64-bit Long type for election id, should
// we use 128-bit ?
long nextElectId = controller.getNewMasterElectionId();
Uint128 newElectionId = Uint128.newBuilder()
.setLow(nextElectId)
.build();
MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
.setDeviceId(p4DeviceId)
.setElectionId(newElectionId)
.build();
StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
.setArbitration(arbitrationUpdate)
.build();
log.debug("Sending arbitration update to {} with election id {}...",
deviceId, newElectionId);
arbitrationUpdateMap.put(newElectionId, result);
try {
streamRequestObserver.onNext(requestMsg);
return result.get();
} catch (InterruptedException | ExecutionException | StatusRuntimeException e) {
log.warn("Arbitration update failed for {} due to {}", deviceId, e);
arbitrationUpdateMap.remove(newElectionId);
return false;
}
}
private boolean doInitStreamChannel() {
// To listen for packets and other events, we need to start the RPC.
// Here we do it by sending a master arbitration update.
log.info("initializing stream chanel on {}...", deviceId);
if (!doArbitrationUpdate()) {
log.warn("Unable to initialize stream channel for {}", deviceId);
return false;
} else {
return true;
}
}
private boolean doArbitrationUpdate() {
log.info("Sending arbitration update to {}...", deviceId);
StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
.setArbitration(MasterArbitrationUpdate.newBuilder()
.setDeviceId(p4DeviceId)
.build())
.build();
try {
streamRequestObserver.onNext(requestMsg);
return true;
} catch (StatusRuntimeException e) {
log.warn("Arbitration update failed for {}: {}", deviceId, e);
return false;
}
return doArbitrationUpdate();
}
private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
@ -315,6 +332,7 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
.newBuilder()
.setElectionId(p4RuntimeElectionId)
.setAction(VERIFY_AND_COMMIT)
.addConfigs(pipelineConfig)
.build();
@ -330,7 +348,6 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
PiPipeconf pipeconf) {
WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
@ -350,11 +367,7 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
writeRequestBuilder
.setDeviceId(p4DeviceId)
/* PI ignores this ElectionId, commenting out for now.
.setElectionId(Uint128.newBuilder()
.setHigh(0)
.setLow(ELECTION_ID)
.build()) */
.setElectionId(p4RuntimeElectionId)
.addAllUpdates(updateMsgs)
.build();
@ -455,8 +468,33 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
}
private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
Uint128 electionId = arbitrationMsg.getElectionId();
CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
if (mastershipFeature == null) {
log.warn("Can't find completable future of election id {}", electionId);
return;
}
this.p4RuntimeElectionId = electionId;
int statusCode = arbitrationMsg.getStatus().getCode();
MastershipRole arbitrationRole;
// arbitration update success
if (statusCode == Status.OK.getCode().value()) {
mastershipFeature.complete(true);
arbitrationRole = MastershipRole.MASTER;
} else {
mastershipFeature.complete(false);
arbitrationRole = MastershipRole.STANDBY;
}
DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
arbitrationEventSubject);
controller.postEvent(event);
}
private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
@ -490,7 +528,6 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
}
private boolean doWriteActionGroupMembers(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
try {
for (PiActionGroupMember member : group.members()) {
@ -518,6 +555,7 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
WriteRequest writeRequestMsg = WriteRequest.newBuilder()
.setDeviceId(p4DeviceId)
.setElectionId(p4RuntimeElectionId)
.addAllUpdates(updateMsgs)
.build();
try {
@ -652,7 +690,6 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
}
private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
final ActionProfileGroup actionProfileGroup;
try {
actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
@ -663,6 +700,7 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
.setDeviceId(p4DeviceId)
.setElectionId(p4RuntimeElectionId)
.addUpdates(Update.newBuilder()
.setEntity(Entity.newBuilder()
.setActionProfileGroup(actionProfileGroup)

View File

@ -35,6 +35,8 @@ import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.onosproject.p4runtime.api.P4RuntimeEventListener;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.io.IOException;
@ -55,19 +57,26 @@ public class P4RuntimeControllerImpl
extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
implements P4RuntimeController {
private static final String P4R_ELECTION = "p4runtime-election";
private final Logger log = getLogger(getClass());
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
// TODO: should use a cache to delete unused locks.
private final Map<DeviceId, ReadWriteLock> deviceLocks = Maps.newConcurrentMap();
private AtomicCounter electionIdGenerator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
public GrpcController grpcController;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
public StorageService storageService;
@Activate
public void activate() {
eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
log.info("Started");
}
@ -186,6 +195,11 @@ public class P4RuntimeControllerImpl
}
}
@Override
public long getNewMasterElectionId() {
return electionIdGenerator.incrementAndGet();
}
public void postEvent(P4RuntimeEvent event) {
post(event);
}

View File

@ -48,6 +48,7 @@ import org.onosproject.net.pi.runtime.PiActionProfileId;
import p4.P4RuntimeOuterClass.ActionProfileGroup;
import p4.P4RuntimeOuterClass.ActionProfileMember;
import p4.P4RuntimeOuterClass.Entity;
import p4.P4RuntimeOuterClass.Uint128;
import p4.P4RuntimeOuterClass.Update;
import p4.P4RuntimeOuterClass.WriteRequest;
@ -101,6 +102,7 @@ public class P4RuntimeGroupTest {
private static final int SET_EGRESS_PORT_ID = 16794308;
private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
private static final long DEFAULT_TIMEOUT_TIME = 10;
private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build();
private P4RuntimeClientImpl client;
private P4RuntimeControllerImpl controller;
@ -156,6 +158,7 @@ public class P4RuntimeGroupTest {
client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
grpcChannel,
controller);
client.p4RuntimeElectionId = DEFAULT_ELECTION_ID;
}
@Test
@ -166,6 +169,7 @@ public class P4RuntimeGroupTest {
WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
assertEquals(1, result.getDeviceId());
assertEquals(1, result.getUpdatesCount());
assertEquals(DEFAULT_ELECTION_ID, result.getElectionId());
Update update = result.getUpdatesList().get(0);
assertEquals(Update.Type.INSERT, update.getType());
@ -194,6 +198,7 @@ public class P4RuntimeGroupTest {
WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
assertEquals(1, result.getDeviceId());
assertEquals(3, result.getUpdatesCount());
assertEquals(DEFAULT_ELECTION_ID, result.getElectionId());
List<Update> updates = result.getUpdatesList();
for (Update update : updates) {

View File

@ -5,7 +5,7 @@ include_defs(
PROTOBUF_VER = '3.0.2'
GRPC_VER = '1.3.0'
PI_COMMIT = '9fc50cd0a0187eb1346272524d4b8bafb51bb513'
PI_COMMIT = 'a8814a8ac40838a9df83fe47a17a025b69026fcf'
PI_BASEURL = 'https://github.com/p4lang/PI.git'
# Wondering which .proto files to build? Check p4runtime's Makefile:

View File

@ -148,6 +148,10 @@ public class P4RuntimePacketProvider extends AbstractProvider implements PacketP
@Override
public void event(P4RuntimeEvent event) {
if (event.type() != P4RuntimeEvent.Type.PACKET_IN) {
// Not a packet-in event, ignore it.
return;
}
P4RuntimePacketIn eventSubject = (P4RuntimePacketIn) event.subject();
DeviceId deviceId = eventSubject.deviceId();

View File

@ -15,8 +15,8 @@
set -e
BUILD_DIR=~/p4tools
BMV2_COMMIT="4eeb8dad8e8f062636f9d0d8296aa7f288c6f6dd"
PI_COMMIT="9fc50cd0a0187eb1346272524d4b8bafb51bb513"
BMV2_COMMIT="44ac9c21636b00fed660ae8590889d85b5d4df4c"
PI_COMMIT="a8814a8ac40838a9df83fe47a17a025b69026fcf"
P4C_COMMIT="040b931fbfcb7912e3a14cd05df950fbdd49b038"
PROTOBUF_COMMIT="tags/v3.0.2"
GRPC_COMMIT="tags/v1.3.0"