Catch exceptions when processing messages on P4Runtime stream channel

Also, packet I/O test in P4RuntimeTest.

Change-Id: Ib11d7356eef43cd962cf47f8a6fba8fc23ed69be
This commit is contained in:
Carmelo Cascone 2017-07-30 01:56:30 -04:00 committed by Andrea Campanella
parent 0e896a0285
commit a966c34402
3 changed files with 67 additions and 47 deletions

View File

@ -43,7 +43,6 @@ import java.net.URL;
import java.util.concurrent.ExecutionException;
import static org.onlab.util.ImmutableByteSequence.copyFrom;
import static org.onlab.util.ImmutableByteSequence.fit;
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.BMV2_JSON;
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.P4_INFO_TEXT;
import static org.onosproject.net.pi.runtime.PiPacketOperation.Type.PACKET_OUT;
@ -56,7 +55,7 @@ import static p4.P4RuntimeOuterClass.Update.Type.INSERT;
public class P4RuntimeTest {
private static final String GRPC_SERVER_ADDR = "192.168.56.102";
private static final int GRPC_SERVER_PORT = 55044;
private static final int GRPC_SERVER_PORT = 55001;
private final URL p4InfoUrl = this.getClass().getResource("/bmv2/default.p4info");
private final URL jsonUrl = this.getClass().getResource("/bmv2/default.json");
@ -112,10 +111,10 @@ public class P4RuntimeTest {
.setGroupId(1)
.setType(SELECT)
.addMembers(P4RuntimeOuterClass.ActionProfileGroup.Member.newBuilder()
.setMemberId(1)
.setMemberId(0)
.setWeight(1)
.build())
.setMaxSize(3)
.setMaxSize(1)
.build();
P4RuntimeOuterClass.WriteRequest writeRequest = P4RuntimeOuterClass.WriteRequest.newBuilder()
@ -123,13 +122,13 @@ public class P4RuntimeTest {
.addUpdates(P4RuntimeOuterClass.Update.newBuilder()
.setType(INSERT)
.setEntity(P4RuntimeOuterClass.Entity.newBuilder()
.setActionProfileGroup(groupMsg)
.setActionProfileMember(profileMemberMsg)
.build())
.build())
.addUpdates(P4RuntimeOuterClass.Update.newBuilder()
.setType(INSERT)
.setEntity(P4RuntimeOuterClass.Entity.newBuilder()
.setActionProfileMember(profileMemberMsg)
.setActionProfileGroup(groupMsg)
.build())
.build())
.build();
@ -137,19 +136,23 @@ public class P4RuntimeTest {
stub.write(writeRequest);
}
private void testPacketOut() throws IllegalAccessException, InstantiationException, ExecutionException,
private void testPacketIo() throws IllegalAccessException, InstantiationException, ExecutionException,
InterruptedException, ImmutableByteSequence.ByteSequenceTrimException {
// Emits a packet out trough the CPU_PORT (255), i.e. we should receive the same packet back.
PiPacketOperation packetOperation = PiPacketOperation.builder()
.withData(ImmutableByteSequence.ofOnes(10))
.withData(ImmutableByteSequence.ofOnes(512))
.withType(PACKET_OUT)
.withMetadata(PiPacketMetadata.builder()
.withId(PiPacketMetadataId.of("egress_port"))
.withValue(fit(copyFrom(1), 9))
.withValue(copyFrom((short) 255))
.build())
.build();
assert (client.packetOut(packetOperation, bmv2DefaultPipeconf).get());
// Wait for packet in.
Thread.sleep(1000);
}
private void testDumpTable(String tableName, PiPipeconf pipeconf) throws ExecutionException, InterruptedException {
@ -161,12 +164,9 @@ public class P4RuntimeTest {
public void testBmv2() throws Exception {
createClientAndSetPipelineConfig(bmv2DefaultPipeconf, BMV2_JSON);
testPacketIo();
testDumpTable("table0", bmv2DefaultPipeconf);
// testPacketOut();
testActionProfile(285261835);
testActionProfile(285227860);
}
@Test

View File

@ -16,6 +16,7 @@
package org.onosproject.p4runtime.ctl;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import org.onosproject.event.AbstractEvent;
import org.onosproject.net.DeviceId;
@ -78,5 +79,13 @@ final class DefaultPacketInEvent
public int hashCode() {
return Objects.hashCode(deviceId, operation);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("deviceId", deviceId)
.add("operation", operation)
.toString();
}
}
}

View File

@ -110,7 +110,7 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
this.p4DeviceId = p4DeviceId;
this.controller = controller;
this.cancellableContext = Context.current().withCancellation();
this.executorService = Executors.newFixedThreadPool(5, groupedThreads(
this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
"onos/p4runtime-client-" + deviceId.toString(),
deviceId.toString() + "-%d"));
this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
@ -133,6 +133,9 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
writeLock.lock();
try {
return supplier.get();
} catch (Throwable ex) {
log.error("Exception in P4Runtime client of {}", deviceId, ex);
throw ex;
} finally {
writeLock.unlock();
}
@ -348,6 +351,32 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
return true;
}
private void doPacketIn(PacketIn packetInMsg) {
// Retrieve the pipeconf for this client's device.
PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
if (pipeconfService == null) {
throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
}
final PiPipeconf pipeconf;
if (pipeconfService.ofDevice(deviceId).isPresent() &&
pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
} else {
log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
return;
}
// Decode packet message and post event.
P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetInMsg, pipeconf));
log.debug("Received packet in: {}", event);
controller.postEvent(event);
}
private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
}
/**
* Returns the internal P4 device ID associated with this client.
*
@ -401,39 +430,21 @@ public final class P4RuntimeClientImpl implements P4RuntimeClient {
}
private void doNext(StreamMessageResponse message) {
log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
switch (message.getUpdateCase()) {
case PACKET:
// Packet-in
PacketIn packetIn = message.getPacket();
// Retrieve the pipeconf for the specific device
PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
if (pipeconfService == null) {
throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
}
final PiPipeconf pipeconf;
if (pipeconfService.ofDevice(deviceId).isPresent() &&
pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
} else {
log.warn("Unable to get the pipeconf of the {}. Can't handle packet in", deviceId);
try {
log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
switch (message.getUpdateCase()) {
case PACKET:
// Packet-in
doPacketIn(message.getPacket());
return;
}
//decode the packet and generate a corresponding p4Runtime event containing the PiPacketOperation
P4RuntimeEvent event =
new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetIn, pipeconf));
//posting the event upwards
controller.postEvent(event);
return;
case ARBITRATION:
throw new UnsupportedOperationException("Arbitration not implemented.");
default:
log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
case ARBITRATION:
doArbitrationUpdateFromDevice(message.getArbitration());
return;
default:
log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
}
} catch (Throwable ex) {
log.error("Exception while processing stream channel message from {}", deviceId, ex);
}
}