mirror of
https://github.com/opennetworkinglab/onos.git
synced 2026-05-05 20:26:16 +02:00
ONOS-6561 BMv2 handshaker via P4Runtime
+ support fort device-specific default pipeconf + improvements to P4runtime and gRPC protocol stuff Change-Id: I8986fce3959df564454ea3d31859860f61eabcae
This commit is contained in:
parent
226cb318c4
commit
59f57decd1
@ -0,0 +1,43 @@
|
||||
/*
|
||||
* Copyright 2017-present Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.net.pi.model;
|
||||
|
||||
import org.onosproject.net.driver.HandlerBehaviour;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Behavior to program the pipeline of a device.
|
||||
*/
|
||||
public interface PiPipelineProgrammable extends HandlerBehaviour {
|
||||
/**
|
||||
* Deploys the given pipeconf to the device.
|
||||
*
|
||||
* @param pipeconf pipeconf
|
||||
* @return true if the operation was successful, false otherwise
|
||||
*/
|
||||
// TODO: return an explanation of why things went wrong, and the status of the device.
|
||||
CompletableFuture<Boolean> deployPipeconf(PiPipeconf pipeconf);
|
||||
|
||||
/**
|
||||
* Returns the default pipeconf for ths device, to be used when any other pipeconf is not available.
|
||||
*
|
||||
* @return optional pipeconf
|
||||
*/
|
||||
Optional<PiPipeconf> getDefaultPipeconf();
|
||||
}
|
||||
@ -1,8 +1,32 @@
|
||||
GRPC_DEPS = [
|
||||
'//incubator/grpc-dependencies:grpc-core-repkg-1.3.0',
|
||||
'//lib:grpc-protobuf-1.3.0',
|
||||
'//lib:grpc-protobuf-lite-1.3.0',
|
||||
'//lib:grpc-stub-1.3.0',
|
||||
'//lib:grpc-netty-1.3.0',
|
||||
'//lib:grpc-auth-1.3.0',
|
||||
'//lib:google-instrumentation-0.3.0',
|
||||
'//lib:protobuf-java-3.0.2',
|
||||
# Lazily adding all netty-related packages.
|
||||
# Some of them might not be necessary.
|
||||
'//lib:netty',
|
||||
'//lib:netty-buffer',
|
||||
'//lib:netty-codec',
|
||||
'//lib:netty-codec-http',
|
||||
'//lib:netty-codec-http2',
|
||||
'//lib:netty-common',
|
||||
'//lib:netty-handler',
|
||||
'//lib:netty-transport',
|
||||
'//lib:netty-transport-native-epoll',
|
||||
'//lib:netty-resolver',
|
||||
]
|
||||
|
||||
COMPILE_DEPS = [
|
||||
'//lib:CORE_DEPS',
|
||||
'//protocols/grpc/api:onos-protocols-grpc-api',
|
||||
'//lib:grpc-core-1.3.0',
|
||||
'//lib:grpc-stub-1.3.0'
|
||||
'//protocols/p4runtime/api:onos-protocols-p4runtime-api',
|
||||
'//incubator/bmv2/model:onos-incubator-bmv2-model',
|
||||
'//incubator/grpc-dependencies:grpc-core-repkg-1.3.0',
|
||||
'//lib:grpc-netty-1.3.0',
|
||||
]
|
||||
|
||||
TEST_DEPS = [
|
||||
@ -12,11 +36,14 @@ TEST_DEPS = [
|
||||
|
||||
BUNDLES = [
|
||||
':onos-drivers-bmv2',
|
||||
'//lib:grpc-core-1.3.0',
|
||||
'//lib:grpc-stub-1.3.0',
|
||||
'//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
|
||||
'//protocols/p4runtime/api:onos-protocols-p4runtime-api',
|
||||
'//protocols/p4runtime/ctl:onos-protocols-p4runtime-ctl',
|
||||
'//protocols/grpc/api:onos-protocols-grpc-api',
|
||||
'//protocols/grpc/ctl:onos-protocols-grpc-ctl',
|
||||
]
|
||||
'//protocols/grpc/proto:onos-protocols-grpc-proto',
|
||||
'//incubator/bmv2/model:onos-incubator-bmv2-model',
|
||||
] + GRPC_DEPS
|
||||
|
||||
osgi_jar_with_tests(
|
||||
deps = COMPILE_DEPS,
|
||||
|
||||
@ -0,0 +1,121 @@
|
||||
/*
|
||||
* Copyright 2017-present Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.drivers.bmv2;
|
||||
|
||||
import com.google.common.collect.ImmutableBiMap;
|
||||
import org.onlab.util.ImmutableByteSequence;
|
||||
import org.onosproject.net.PortNumber;
|
||||
import org.onosproject.net.driver.AbstractHandlerBehaviour;
|
||||
import org.onosproject.net.flow.TrafficTreatment;
|
||||
import org.onosproject.net.flow.criteria.Criterion;
|
||||
import org.onosproject.net.flow.instructions.Instruction;
|
||||
import org.onosproject.net.flow.instructions.Instructions;
|
||||
import org.onosproject.net.pi.model.PiPipeconf;
|
||||
import org.onosproject.net.pi.model.PiPipelineInterpreter;
|
||||
import org.onosproject.net.pi.runtime.PiAction;
|
||||
import org.onosproject.net.pi.runtime.PiActionId;
|
||||
import org.onosproject.net.pi.runtime.PiActionParam;
|
||||
import org.onosproject.net.pi.runtime.PiActionParamId;
|
||||
import org.onosproject.net.pi.runtime.PiHeaderFieldId;
|
||||
import org.onosproject.net.pi.runtime.PiTableAction;
|
||||
import org.onosproject.net.pi.runtime.PiTableId;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.onosproject.net.PortNumber.CONTROLLER;
|
||||
|
||||
/**
|
||||
* Interpreter implementation for the default pipeconf.
|
||||
*/
|
||||
public class Bmv2DefaultInterpreter extends AbstractHandlerBehaviour implements PiPipelineInterpreter {
|
||||
private static final String TABLE0 = "table0";
|
||||
private static final String SEND_TO_CPU = "send_to_cpu_0";
|
||||
private static final String PORT = "port";
|
||||
private static final String DROP = "_drop_0";
|
||||
private static final String SET_EGRESS_PORT = "set_egress_port_0";
|
||||
|
||||
private static final PiHeaderFieldId IN_PORT_ID = PiHeaderFieldId.of("standard_metadata", "ingress_port");
|
||||
private static final PiHeaderFieldId ETH_DST_ID = PiHeaderFieldId.of("ethernet_t", "dstAddr");
|
||||
private static final PiHeaderFieldId ETH_SRC_ID = PiHeaderFieldId.of("ethernet_t", "srcAddr");
|
||||
private static final PiHeaderFieldId ETH_TYPE_ID = PiHeaderFieldId.of("ethernet_t", "etherType");
|
||||
|
||||
private static final ImmutableBiMap<Criterion.Type, PiHeaderFieldId> CRITERION_MAP = ImmutableBiMap.of(
|
||||
Criterion.Type.IN_PORT, IN_PORT_ID,
|
||||
Criterion.Type.ETH_DST, ETH_DST_ID,
|
||||
Criterion.Type.ETH_SRC, ETH_SRC_ID,
|
||||
Criterion.Type.ETH_TYPE, ETH_TYPE_ID);
|
||||
|
||||
private static final ImmutableBiMap<Integer, PiTableId> TABLE_MAP = ImmutableBiMap.of(
|
||||
0, PiTableId.of(TABLE0));
|
||||
|
||||
@Override
|
||||
public PiTableAction mapTreatment(TrafficTreatment treatment, PiPipeconf pipeconf) throws PiInterpreterException {
|
||||
|
||||
if (treatment.allInstructions().size() == 0) {
|
||||
// No instructions means drop for us.
|
||||
return actionWithName(DROP);
|
||||
} else if (treatment.allInstructions().size() > 1) {
|
||||
// Otherwise, we understand treatments with only 1 instruction.
|
||||
throw new PiPipelineInterpreter.PiInterpreterException("Treatment has multiple instructions");
|
||||
}
|
||||
|
||||
Instruction instruction = treatment.allInstructions().get(0);
|
||||
|
||||
switch (instruction.type()) {
|
||||
case OUTPUT:
|
||||
Instructions.OutputInstruction outInstruction = (Instructions.OutputInstruction) instruction;
|
||||
PortNumber port = outInstruction.port();
|
||||
if (!port.isLogical()) {
|
||||
PiAction.builder()
|
||||
.withId(PiActionId.of(SET_EGRESS_PORT))
|
||||
.withParameter(new PiActionParam(PiActionParamId.of(PORT),
|
||||
ImmutableByteSequence.copyFrom(port.toLong())))
|
||||
.build();
|
||||
} else if (port.equals(CONTROLLER)) {
|
||||
return actionWithName(SEND_TO_CPU);
|
||||
} else {
|
||||
throw new PiInterpreterException("Egress on logical port not supported: " + port);
|
||||
}
|
||||
case NOACTION:
|
||||
return actionWithName(DROP);
|
||||
default:
|
||||
throw new PiInterpreterException("Instruction type not supported: " + instruction.type().name());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an action instance with no runtime parameters.
|
||||
*/
|
||||
private PiAction actionWithName(String name) {
|
||||
return PiAction.builder().withId(PiActionId.of(name)).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<PiHeaderFieldId> mapCriterionType(Criterion.Type type) {
|
||||
return Optional.ofNullable(CRITERION_MAP.get(type));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Criterion.Type> mapPiHeaderFieldId(PiHeaderFieldId headerFieldId) {
|
||||
return Optional.ofNullable(CRITERION_MAP.inverse().get(headerFieldId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<PiTableId> mapFlowRuleTableId(int flowRuleTableId) {
|
||||
return Optional.ofNullable(TABLE_MAP.get(flowRuleTableId));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,103 @@
|
||||
/*
|
||||
* Copyright 2017-present Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.drivers.bmv2;
|
||||
|
||||
import com.eclipsesource.json.Json;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.onosproject.bmv2.model.Bmv2PipelineModelParser;
|
||||
import org.onosproject.net.driver.Behaviour;
|
||||
import org.onosproject.net.pi.model.PiPipeconf;
|
||||
import org.onosproject.net.pi.model.PiPipeconfId;
|
||||
import org.onosproject.net.pi.model.PiPipelineInterpreter;
|
||||
import org.onosproject.net.pi.model.PiPipelineModel;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Implementation of the default pipeconf for a BMv2 device.
|
||||
*/
|
||||
public final class Bmv2DefaultPipeconf implements PiPipeconf {
|
||||
|
||||
private static final String PIPECONF_ID = "bmv2-default-pipeconf";
|
||||
private static final String JSON_PATH = "/default.json";
|
||||
private static final String P4INFO_PATH = "/default.p4info";
|
||||
|
||||
private final PiPipeconfId id;
|
||||
private final PiPipelineModel pipelineModel;
|
||||
private final InputStream jsonConfigStream = this.getClass().getResourceAsStream(JSON_PATH);
|
||||
private final InputStream p4InfoStream = this.getClass().getResourceAsStream(P4INFO_PATH);
|
||||
private final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours;
|
||||
|
||||
Bmv2DefaultPipeconf() {
|
||||
this.id = new PiPipeconfId(PIPECONF_ID);
|
||||
try {
|
||||
this.pipelineModel = Bmv2PipelineModelParser.parse(
|
||||
Json.parse(new BufferedReader(new InputStreamReader(jsonConfigStream))).asObject());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
this.behaviours = ImmutableMap.of(
|
||||
PiPipelineInterpreter.class, Bmv2DefaultInterpreter.class
|
||||
// TODO: reuse default single table pipeliner.
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PiPipeconfId id() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PiPipelineModel pipelineModel() {
|
||||
return pipelineModel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Class<? extends Behaviour>> behaviours() {
|
||||
return behaviours.keySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Class<? extends Behaviour>> implementation(Class<? extends Behaviour> behaviour) {
|
||||
return Optional.ofNullable(behaviours.get(behaviour));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasBehaviour(Class<? extends Behaviour> behaviourClass) {
|
||||
return behaviours.containsKey(behaviourClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<InputStream> extension(ExtensionType type) {
|
||||
|
||||
switch (type) {
|
||||
case BMV2_JSON:
|
||||
return Optional.of(jsonConfigStream);
|
||||
case P4_INFO_TEXT:
|
||||
return Optional.of(p4InfoStream);
|
||||
default:
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -16,17 +16,14 @@
|
||||
|
||||
package org.onosproject.drivers.bmv2;
|
||||
|
||||
import org.onosproject.grpc.api.GrpcChannelId;
|
||||
import org.onosproject.grpc.api.GrpcController;
|
||||
import org.onosproject.grpc.api.GrpcServiceId;
|
||||
import org.onosproject.grpc.api.GrpcStreamObserverId;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
import org.onosproject.net.DeviceId;
|
||||
import org.onosproject.net.MastershipRole;
|
||||
import org.onosproject.net.device.DeviceHandshaker;
|
||||
import org.onosproject.net.driver.AbstractHandlerBehaviour;
|
||||
import org.onosproject.net.driver.DriverData;
|
||||
import org.onosproject.net.key.DeviceKeyId;
|
||||
import org.onosproject.net.key.DeviceKeyService;
|
||||
import org.onosproject.p4runtime.api.P4RuntimeController;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
@ -34,63 +31,71 @@ import java.util.concurrent.CompletableFuture;
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
/**
|
||||
* Implementation of the DeviceHandshaker for the BMv2 softswitch.
|
||||
* Implementation of DeviceHandshaker for BMv2.
|
||||
*/
|
||||
//TODO consider abstract class with empty connect method and
|
||||
//the implementation into a protected one for reusability.
|
||||
//FIXME fill method bodies, used for testing.
|
||||
public class Bmv2Handshaker extends AbstractHandlerBehaviour
|
||||
implements DeviceHandshaker {
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
// TODO: consider abstract class with empty connect method and implementation into a protected one for reusability.
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> connect() {
|
||||
GrpcController controller = handler().get(GrpcController.class);
|
||||
return CompletableFuture.supplyAsync(this::doConnect);
|
||||
}
|
||||
|
||||
private boolean doConnect() {
|
||||
|
||||
P4RuntimeController controller = handler().get(P4RuntimeController.class);
|
||||
|
||||
DeviceId deviceId = handler().data().deviceId();
|
||||
|
||||
CompletableFuture<Boolean> result = new CompletableFuture<>();
|
||||
DeviceKeyService deviceKeyService = handler().get(DeviceKeyService.class);
|
||||
// DeviceKeyService deviceKeyService = handler().get(DeviceKeyService.class);
|
||||
DriverData data = data();
|
||||
//Retrieving information from the driver data.
|
||||
log.info("protocol bmv2, ip {}, port {}, key {}", data.value("p4runtime_ip"),
|
||||
data.value("p4runtime_port"),
|
||||
deviceKeyService.getDeviceKey(DeviceKeyId.deviceKeyId(data.value("p4runtime_key")))
|
||||
.asUsernamePassword().username());
|
||||
|
||||
log.info("protocol gnmi, ip {}, port {}, key {}", data.value("gnmi_ip"), data.value("gnmi_port"),
|
||||
deviceKeyService.getDeviceKey(DeviceKeyId.deviceKeyId(data.value("gnmi_key")))
|
||||
.asUsernamePassword().username());
|
||||
result.complete(true);
|
||||
String serverAddr = data.value("p4runtime_ip");
|
||||
int serverPort = Integer.valueOf(data.value("p4runtime_port"));
|
||||
int p4DeviceId = Integer.valueOf(data.value("p4runtime_deviceId"));
|
||||
|
||||
//we know we need packet in so we register the observer.
|
||||
GrpcChannelId channelId = GrpcChannelId.of(deviceId, "bmv2");
|
||||
GrpcServiceId serviceId = GrpcServiceId.of(channelId, "p4runtime");
|
||||
GrpcStreamObserverId observerId = GrpcStreamObserverId.of(serviceId,
|
||||
Bmv2PacketProgrammable.class.getSimpleName());
|
||||
controller.addObserver(observerId, new Bmv2PacketInObserverHandler());
|
||||
return result;
|
||||
ManagedChannelBuilder channelBuilder = NettyChannelBuilder
|
||||
.forAddress(serverAddr, serverPort)
|
||||
.usePlaintext(true);
|
||||
|
||||
if (!controller.createClient(deviceId, p4DeviceId, channelBuilder)) {
|
||||
log.warn("Unable to create P4runtime client for {}", deviceId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: gNMI handling
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> disconnect() {
|
||||
CompletableFuture<Boolean> result = new CompletableFuture<>();
|
||||
result.complete(true);
|
||||
return result;
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
P4RuntimeController controller = handler().get(P4RuntimeController.class);
|
||||
DeviceId deviceId = handler().data().deviceId();
|
||||
controller.removeClient(deviceId);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isReachable() {
|
||||
CompletableFuture<Boolean> result = new CompletableFuture<>();
|
||||
result.complete(true);
|
||||
return result;
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
P4RuntimeController controller = handler().get(P4RuntimeController.class);
|
||||
DeviceId deviceId = handler().data().deviceId();
|
||||
return controller.isReacheable(deviceId);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<MastershipRole> roleChanged(MastershipRole newRole) {
|
||||
CompletableFuture<MastershipRole> result = new CompletableFuture<>();
|
||||
log.warn("roleChanged not implemented");
|
||||
result.complete(MastershipRole.MASTER);
|
||||
// TODO.
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1,79 +0,0 @@
|
||||
/*
|
||||
* Copyright 2017-present Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.drivers.bmv2;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import org.onosproject.grpc.api.GrpcObserverHandler;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
/**
|
||||
* Sample Implementation of a PacketInObserverHandler.
|
||||
* TODO refactor and actually use.
|
||||
*/
|
||||
public class Bmv2PacketInObserverHandler implements GrpcObserverHandler {
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
//private final AbstractStub asyncStub;
|
||||
|
||||
//FIXME put at object due to p4Runtime compilation problems to be fixed.
|
||||
private StreamObserver<Object> requestStreamObserver;
|
||||
|
||||
@Override
|
||||
public void bindObserver(ManagedChannel channel) {
|
||||
|
||||
//asyncStub = ProtoGeneratedClass.newStub(channel);
|
||||
|
||||
//reqeustStreamObserver = asyncStub.MethodName(new PacketInObserver());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<StreamObserver> requestStreamObserver() {
|
||||
return Optional.of(requestStreamObserver);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeObserver() {
|
||||
//this should complete the two way streaming
|
||||
requestStreamObserver.onCompleted();
|
||||
}
|
||||
|
||||
private class PacketInObserver implements StreamObserver<Object> {
|
||||
|
||||
@Override
|
||||
public void onNext(Object o) {
|
||||
log.info("onNext: {}", o.toString());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -16,40 +16,32 @@
|
||||
|
||||
package org.onosproject.drivers.bmv2;
|
||||
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import org.onosproject.grpc.api.GrpcChannelId;
|
||||
import org.onosproject.grpc.api.GrpcController;
|
||||
import org.onosproject.grpc.api.GrpcObserverHandler;
|
||||
import org.onosproject.grpc.api.GrpcServiceId;
|
||||
import org.onosproject.grpc.api.GrpcStreamObserverId;
|
||||
import org.onosproject.net.DeviceId;
|
||||
import org.onosproject.net.driver.AbstractHandlerBehaviour;
|
||||
import org.onosproject.net.driver.DriverHandler;
|
||||
import org.onosproject.net.packet.OutboundPacket;
|
||||
import org.onosproject.net.packet.PacketProgrammable;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Packet Programmable behaviour for BMv2 devices.
|
||||
*/
|
||||
public class Bmv2PacketProgrammable extends AbstractHandlerBehaviour implements PacketProgrammable {
|
||||
|
||||
@Override
|
||||
public void emit(OutboundPacket packet) {
|
||||
DriverHandler handler = handler();
|
||||
GrpcController controller = handler.get(GrpcController.class);
|
||||
DeviceId deviceId = handler.data().deviceId();
|
||||
GrpcChannelId channelId = GrpcChannelId.of(deviceId, "bmv2");
|
||||
GrpcServiceId serviceId = GrpcServiceId.of(channelId, "p4runtime");
|
||||
GrpcStreamObserverId observerId = GrpcStreamObserverId.of(serviceId,
|
||||
this.getClass().getSimpleName());
|
||||
Optional<GrpcObserverHandler> manager = controller.getObserverManager(observerId);
|
||||
if (!manager.isPresent()) {
|
||||
//this is the first time the behaviour is called
|
||||
controller.addObserver(observerId, new Bmv2PacketInObserverHandler());
|
||||
}
|
||||
//other already registered the observer for us.
|
||||
Optional<StreamObserver> observer = manager.get().requestStreamObserver();
|
||||
observer.ifPresent(objectStreamObserver -> objectStreamObserver.onNext(packet));
|
||||
// TODO: implement using P4runtime client.
|
||||
// DriverHandler handler = handler();
|
||||
// GrpcController controller = handler.get(GrpcController.class);
|
||||
// DeviceId deviceId = handler.data().deviceId();
|
||||
// GrpcChannelId channelId = GrpcChannelId.of(deviceId, "bmv2");
|
||||
// GrpcServiceId serviceId = GrpcServiceId.of(channelId, "p4runtime");
|
||||
// GrpcStreamObserverId observerId = GrpcStreamObserverId.of(serviceId,
|
||||
// this.getClass().getSimpleName());
|
||||
// Optional<GrpcObserverHandler> manager = controller.getObserverManager(observerId);
|
||||
// if (!manager.isPresent()) {
|
||||
// //this is the first time the behaviour is called
|
||||
// controller.addObserver(observerId, new Bmv2PacketInObserverHandler());
|
||||
// }
|
||||
// //other already registered the observer for us.
|
||||
// Optional<StreamObserver> observer = manager.get().requestStreamObserver();
|
||||
// observer.ifPresent(objectStreamObserver -> objectStreamObserver.onNext(packet));
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,107 @@
|
||||
/*
|
||||
* Copyright 2017-present Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.drivers.bmv2;
|
||||
|
||||
import org.onlab.util.SharedExecutors;
|
||||
import org.onosproject.net.DeviceId;
|
||||
import org.onosproject.net.driver.AbstractHandlerBehaviour;
|
||||
import org.onosproject.net.pi.model.PiPipeconf;
|
||||
import org.onosproject.net.pi.model.PiPipelineProgrammable;
|
||||
import org.onosproject.p4runtime.api.P4RuntimeClient;
|
||||
import org.onosproject.p4runtime.api.P4RuntimeController;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.BMV2_JSON;
|
||||
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.P4_INFO_TEXT;
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
/**
|
||||
* Implementation of the PiPipelineProgrammable for BMv2.
|
||||
*/
|
||||
public class Bmv2PipelineProgrammable extends AbstractHandlerBehaviour implements PiPipelineProgrammable {
|
||||
|
||||
private static final PiPipeconf DEFAULT_PIPECONF = new Bmv2DefaultPipeconf();
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> deployPipeconf(PiPipeconf pipeconf) {
|
||||
|
||||
CompletableFuture<Boolean> result = new CompletableFuture<>();
|
||||
|
||||
SharedExecutors.getPoolThreadExecutor().submit(() -> result.complete(doDeployConfig(pipeconf)));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private boolean doDeployConfig(PiPipeconf pipeconf) {
|
||||
|
||||
P4RuntimeController controller = handler().get(P4RuntimeController.class);
|
||||
|
||||
DeviceId deviceId = handler().data().deviceId();
|
||||
|
||||
if (!controller.hasClient(deviceId)) {
|
||||
log.warn("Unable to find client for {}, aborting pipeconf deploy", deviceId);
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
P4RuntimeClient client = controller.getClient(deviceId);
|
||||
|
||||
if (!pipeconf.extension(BMV2_JSON).isPresent()) {
|
||||
log.warn("Missing BMv2 JSON config in pipeconf {}, aborting pipeconf deploy", pipeconf.id());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!pipeconf.extension(P4_INFO_TEXT).isPresent()) {
|
||||
log.warn("Missing P4Info in pipeconf {}, aborting pipeconf deploy", pipeconf.id());
|
||||
return false;
|
||||
}
|
||||
|
||||
InputStream p4InfoStream = pipeconf.extension(P4_INFO_TEXT).get();
|
||||
InputStream jsonStream = pipeconf.extension(BMV2_JSON).get();
|
||||
|
||||
try {
|
||||
if (!client.setPipelineConfig(p4InfoStream, jsonStream).get()) {
|
||||
log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// It would be more logical to have this performed at device handshake, but P4runtime would reject any
|
||||
// command if a P4info has not been set first.
|
||||
if (!client.initStreamChannel().get()) {
|
||||
log.warn("Unable to init stream channel to {}.", deviceId);
|
||||
return false;
|
||||
}
|
||||
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<PiPipeconf> getDefaultPipeconf() {
|
||||
return Optional.of(DEFAULT_PIPECONF);
|
||||
}
|
||||
}
|
||||
@ -20,6 +20,8 @@
|
||||
impl="org.onosproject.drivers.bmv2.Bmv2Handshaker"/>
|
||||
<behaviour api="org.onosproject.net.packet.PacketProgrammable"
|
||||
impl="org.onosproject.drivers.bmv2.Bmv2PacketProgrammable"/>
|
||||
<behaviour api="org.onosproject.net.pi.model.PiPipelineProgrammable"
|
||||
impl="org.onosproject.drivers.bmv2.Bmv2PipelineProgrammable"/>
|
||||
</driver>
|
||||
</drivers>
|
||||
|
||||
|
||||
2697
drivers/bmv2/src/main/resources/default.json
Normal file
2697
drivers/bmv2/src/main/resources/default.json
Normal file
File diff suppressed because it is too large
Load Diff
113
drivers/bmv2/src/main/resources/default.p4info
Normal file
113
drivers/bmv2/src/main/resources/default.p4info
Normal file
@ -0,0 +1,113 @@
|
||||
tables {
|
||||
preamble {
|
||||
id: 33617813
|
||||
name: "table0"
|
||||
alias: "table0"
|
||||
}
|
||||
match_fields {
|
||||
id: 1
|
||||
name: "standard_metadata.ingress_port"
|
||||
bitwidth: 9
|
||||
match_type: TERNARY
|
||||
}
|
||||
match_fields {
|
||||
id: 2
|
||||
name: "hdr.ethernet.dstAddr"
|
||||
bitwidth: 48
|
||||
match_type: TERNARY
|
||||
}
|
||||
match_fields {
|
||||
id: 3
|
||||
name: "hdr.ethernet.srcAddr"
|
||||
bitwidth: 48
|
||||
match_type: TERNARY
|
||||
}
|
||||
match_fields {
|
||||
id: 4
|
||||
name: "hdr.ethernet.etherType"
|
||||
bitwidth: 16
|
||||
match_type: TERNARY
|
||||
}
|
||||
action_refs {
|
||||
id: 16794308
|
||||
}
|
||||
action_refs {
|
||||
id: 16829080
|
||||
}
|
||||
action_refs {
|
||||
id: 16793508
|
||||
}
|
||||
action_refs {
|
||||
id: 16800567
|
||||
annotations: "@defaultonly()"
|
||||
}
|
||||
direct_resource_ids: 301990488
|
||||
size: 1024
|
||||
with_entry_timeout: true
|
||||
}
|
||||
actions {
|
||||
preamble {
|
||||
id: 16794308
|
||||
name: "set_egress_port"
|
||||
alias: "set_egress_port"
|
||||
}
|
||||
params {
|
||||
id: 1
|
||||
name: "port"
|
||||
bitwidth: 9
|
||||
}
|
||||
}
|
||||
actions {
|
||||
preamble {
|
||||
id: 16829080
|
||||
name: "send_to_cpu"
|
||||
alias: "send_to_cpu"
|
||||
}
|
||||
}
|
||||
actions {
|
||||
preamble {
|
||||
id: 16793508
|
||||
name: "drop"
|
||||
alias: "drop"
|
||||
}
|
||||
}
|
||||
actions {
|
||||
preamble {
|
||||
id: 16800567
|
||||
name: "NoAction"
|
||||
alias: "NoAction"
|
||||
}
|
||||
}
|
||||
counters {
|
||||
preamble {
|
||||
id: 302025528
|
||||
name: "port_counters_control.egress_port_counter"
|
||||
alias: "egress_port_counter"
|
||||
}
|
||||
spec {
|
||||
unit: PACKETS
|
||||
}
|
||||
size: 254
|
||||
}
|
||||
counters {
|
||||
preamble {
|
||||
id: 301999025
|
||||
name: "port_counters_control.ingress_port_counter"
|
||||
alias: "ingress_port_counter"
|
||||
}
|
||||
spec {
|
||||
unit: PACKETS
|
||||
}
|
||||
size: 254
|
||||
}
|
||||
direct_counters {
|
||||
preamble {
|
||||
id: 301990488
|
||||
name: "table0_counter"
|
||||
alias: "table0_counter"
|
||||
}
|
||||
spec {
|
||||
unit: PACKETS
|
||||
}
|
||||
direct_table_id: 33617813
|
||||
}
|
||||
@ -57,7 +57,7 @@ public interface GrpcController {
|
||||
Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId);
|
||||
|
||||
/**
|
||||
* Tries to connect to a specific gRPC device, if the connection is successful
|
||||
* Tries to connect to a specific gRPC server, if the connection is successful
|
||||
* returns the ManagedChannel. This method blocks until the channel is setup or a timeout expires.
|
||||
* By default the timeout is 20 seconds. If the timeout expires and thus the channel can't be set up
|
||||
* a IOException is thrown.
|
||||
@ -83,6 +83,15 @@ public interface GrpcController {
|
||||
*/
|
||||
Map<GrpcChannelId, ManagedChannel> getChannels();
|
||||
|
||||
/**
|
||||
* Returns true if the channel associated with the given identifier is open, i.e. the server is able to successfully
|
||||
* responds to RPCs.
|
||||
*
|
||||
* @param channelId channel identifier
|
||||
* @return true if channel is open, false otherwise.
|
||||
*/
|
||||
boolean isChannelOpen(GrpcChannelId channelId);
|
||||
|
||||
/**
|
||||
* Returns all ManagedChannels associated to the given device identifier.
|
||||
*
|
||||
|
||||
@ -1,11 +1,13 @@
|
||||
COMPILE_DEPS = [
|
||||
'//lib:CORE_DEPS',
|
||||
'//protocols/grpc/api:onos-protocols-grpc-api',
|
||||
'//protocols/grpc/proto:onos-protocols-grpc-proto',
|
||||
'//lib:grpc-core-1.3.0',
|
||||
'//lib:grpc-protobuf-1.3.0',
|
||||
'//lib:grpc-stub-1.3.0',
|
||||
'//lib:grpc-netty-1.3.0',
|
||||
'//lib:grpc-auth-1.3.0',
|
||||
'//lib:protobuf-java-3.0.2',
|
||||
]
|
||||
|
||||
TEST_DEPS = [
|
||||
|
||||
@ -19,6 +19,8 @@ package org.onosproject.grpc.ctl;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
import org.apache.felix.scr.annotations.Deactivate;
|
||||
@ -27,16 +29,20 @@ import org.onosproject.grpc.api.GrpcChannelId;
|
||||
import org.onosproject.grpc.api.GrpcController;
|
||||
import org.onosproject.grpc.api.GrpcObserverHandler;
|
||||
import org.onosproject.grpc.api.GrpcStreamObserverId;
|
||||
import org.onosproject.grpc.ctl.dummy.Dummy;
|
||||
import org.onosproject.grpc.ctl.dummy.DummyServiceGrpc;
|
||||
import org.onosproject.net.DeviceId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Default implementation of the GrpcController.
|
||||
@ -45,6 +51,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
@Service
|
||||
public class GrpcControllerImpl implements GrpcController {
|
||||
|
||||
private static final int CONNECTION_TIMEOUT_SECONDS = 20;
|
||||
|
||||
public static final Logger log = LoggerFactory
|
||||
.getLogger(GrpcControllerImpl.class);
|
||||
|
||||
@ -87,18 +95,63 @@ public class GrpcControllerImpl implements GrpcController {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder) {
|
||||
public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
|
||||
throws IOException {
|
||||
ManagedChannel channel = channelBuilder.build();
|
||||
|
||||
channel.getState(true);
|
||||
// Forced connection not yet implemented. Use workaround...
|
||||
// channel.getState(true);
|
||||
doDummyMessage(channel);
|
||||
|
||||
channelBuilders.put(channelId, channelBuilder);
|
||||
channels.put(channelId, channel);
|
||||
return channel;
|
||||
}
|
||||
|
||||
private void doDummyMessage(ManagedChannel channel) throws IOException {
|
||||
DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc.newBlockingStub(channel)
|
||||
.withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
try {
|
||||
dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse.getDefaultInstance());
|
||||
} catch (StatusRuntimeException e) {
|
||||
if (e.getStatus() != Status.UNIMPLEMENTED) {
|
||||
// UNIMPLEMENTED means that server received our message but doesn't know how to handle it.
|
||||
// Hence, channel is open.
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isChannelOpen(GrpcChannelId channelId) {
|
||||
if (!channels.containsKey(channelId)) {
|
||||
log.warn("Can't check if channel open for unknown channel id {}", channelId);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
doDummyMessage(channels.get(channelId));
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnectChannel(GrpcChannelId channelId) {
|
||||
channels.get(channelId).shutdown();
|
||||
if (!channels.containsKey(channelId)) {
|
||||
// Nothing to do.
|
||||
return;
|
||||
}
|
||||
ManagedChannel channel = channels.get(channelId);
|
||||
|
||||
try {
|
||||
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Channel {} didn't shut down in time.");
|
||||
channel.shutdownNow();
|
||||
}
|
||||
|
||||
channels.remove(channelId);
|
||||
channelBuilders.remove(channelId);
|
||||
}
|
||||
|
||||
28
protocols/grpc/proto/BUCK
Normal file
28
protocols/grpc/proto/BUCK
Normal file
@ -0,0 +1,28 @@
|
||||
include_defs(
|
||||
'//bucklets/grpc.bucklet'
|
||||
)
|
||||
|
||||
PROTOC_VER = '3.0.2'
|
||||
GRPC_VER = '1.3.0'
|
||||
|
||||
|
||||
COMPILE_DEPS = [
|
||||
'//lib:CORE_DEPS',
|
||||
'//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
|
||||
'//lib:grpc-stub-' + GRPC_VER,
|
||||
'//lib:grpc-protobuf-' + GRPC_VER,
|
||||
'//lib:protobuf-java-' + PROTOC_VER,
|
||||
]
|
||||
|
||||
|
||||
grpc_jar(
|
||||
proto_match_patterns = ["*.proto"],
|
||||
proto_paths = ["$ONOS_ROOT/protocols/grpc/proto/"],
|
||||
protoc_version = PROTOC_VER,
|
||||
plugin_version = GRPC_VER,
|
||||
deps = COMPILE_DEPS,
|
||||
)
|
||||
|
||||
project_config(
|
||||
src_target = ':onos-protocols-grpc-proto'
|
||||
)
|
||||
12
protocols/grpc/proto/dummy.proto
Normal file
12
protocols/grpc/proto/dummy.proto
Normal file
@ -0,0 +1,12 @@
|
||||
syntax = "proto3";
|
||||
|
||||
option java_package = "org.onosproject.grpc.ctl.dummy";
|
||||
|
||||
package dummy;
|
||||
|
||||
service DummyService {
|
||||
rpc SayHello (DummyMessageThatNoOneWouldReallyUse) returns (DummyMessageThatNoOneWouldReallyUse) {}
|
||||
}
|
||||
|
||||
message DummyMessageThatNoOneWouldReallyUse {
|
||||
}
|
||||
@ -65,4 +65,14 @@ public interface P4RuntimeController extends ListenerService<P4RuntimeEvent, P4R
|
||||
* @return true if client exists, false otherwise.
|
||||
*/
|
||||
boolean hasClient(DeviceId deviceId);
|
||||
|
||||
/**
|
||||
* Returns true if the P4Runtime server running on the given device is reachable, i.e. the channel is open and the
|
||||
* server is able to respond to RPCs, false otherwise. Reachability can be tested only if a client was previously
|
||||
* created using {@link #createClient(DeviceId, int, ManagedChannelBuilder)}, otherwise this method returns false.
|
||||
*
|
||||
* @param deviceId device identifier.
|
||||
* @return true if a client was created and is able to contact the P4Runtime server, false otherwise.
|
||||
*/
|
||||
boolean isReacheable(DeviceId deviceId);
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.ExtensionRegistry;
|
||||
import com.google.protobuf.TextFormat;
|
||||
import io.grpc.Context;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
@ -64,6 +65,7 @@ public class P4RuntimeClientImpl implements P4RuntimeClient {
|
||||
private final P4RuntimeGrpc.P4RuntimeStub asyncStub;
|
||||
private ExecutorService executorService;
|
||||
private StreamObserver<StreamMessageRequest> streamRequestObserver;
|
||||
private Context.CancellableContext streamContext;
|
||||
|
||||
|
||||
P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller,
|
||||
@ -74,8 +76,7 @@ public class P4RuntimeClientImpl implements P4RuntimeClient {
|
||||
this.executorService = executorService;
|
||||
this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
|
||||
.withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
|
||||
this.asyncStub = P4RuntimeGrpc.newStub(channel)
|
||||
.withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
|
||||
this.asyncStub = P4RuntimeGrpc.newStub(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -85,21 +86,44 @@ public class P4RuntimeClientImpl implements P4RuntimeClient {
|
||||
|
||||
private boolean doInitStreamChannel() {
|
||||
if (this.streamRequestObserver == null) {
|
||||
this.streamRequestObserver = this.asyncStub.streamChannel(new StreamChannelResponseObserver());
|
||||
|
||||
streamContext = Context.current().withCancellation();
|
||||
streamContext.run(
|
||||
() -> streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver()));
|
||||
|
||||
// To listen for packets and other events, we need to start the RPC.
|
||||
// Here we do it by sending an empty packet out.
|
||||
try {
|
||||
this.streamRequestObserver.onNext(StreamMessageRequest.newBuilder()
|
||||
.setPacket(PacketOut.getDefaultInstance())
|
||||
.build());
|
||||
} catch (StatusRuntimeException e) {
|
||||
log.warn("Unable to initialize stream channel for {}: {}", deviceId, e);
|
||||
// Here we do it by sending a master arbitration update.
|
||||
if (!doArbitrationUpdate()) {
|
||||
log.warn("Unable to initialize stream channel for {}", deviceId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean doArbitrationUpdate() {
|
||||
|
||||
if (streamRequestObserver == null) {
|
||||
log.error("Null request stream observer for {}", deviceId);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
StreamMessageRequest initRequest = StreamMessageRequest
|
||||
.newBuilder()
|
||||
.setArbitration(MasterArbitrationUpdate
|
||||
.newBuilder()
|
||||
.setDeviceId(p4DeviceId)
|
||||
.build())
|
||||
.build();
|
||||
streamRequestObserver.onNext(initRequest);
|
||||
return true;
|
||||
} catch (StatusRuntimeException e) {
|
||||
log.warn("Arbitration update failed for {}: {}", deviceId, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> setPipelineConfig(InputStream p4info, InputStream targetConfig) {
|
||||
return CompletableFuture.supplyAsync(() -> doSetPipelineConfig(p4info, targetConfig), executorService);
|
||||
@ -168,12 +192,15 @@ public class P4RuntimeClientImpl implements P4RuntimeClient {
|
||||
@Override
|
||||
public void shutdown() {
|
||||
|
||||
if (this.streamRequestObserver != null) {
|
||||
this.streamRequestObserver.onError(new StatusRuntimeException(Status.CANCELLED));
|
||||
this.streamRequestObserver.onCompleted();
|
||||
log.info("Shutting down client for {}...", deviceId);
|
||||
|
||||
if (streamRequestObserver != null) {
|
||||
streamRequestObserver.onCompleted();
|
||||
streamContext.cancel(null);
|
||||
streamContext = null;
|
||||
}
|
||||
|
||||
this.executorService.shutdownNow();
|
||||
this.executorService.shutdown();
|
||||
try {
|
||||
executorService.awaitTermination(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
@ -216,12 +243,16 @@ public class P4RuntimeClientImpl implements P4RuntimeClient {
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
log.warn("Error on stream channel for {}: {}", deviceId, throwable);
|
||||
log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
|
||||
// FIXME: we might want to recreate the channel.
|
||||
// In general, we want to be robust against any transient error and, if the channel is open, make sure the
|
||||
// stream channel is always on.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
// TODO: declare the device as disconnected?
|
||||
log.warn("Stream channel for {} has completed", deviceId);
|
||||
// FIXME: same concern as before.
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -39,8 +39,8 @@ import org.slf4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static java.lang.String.format;
|
||||
@ -62,10 +62,11 @@ public class P4RuntimeControllerImpl
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
|
||||
private final Map<DeviceId, P4RuntimeClient> clients = Maps.newConcurrentMap();
|
||||
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newConcurrentMap();
|
||||
private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
|
||||
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
|
||||
|
||||
// TODO: should use a cache to delete unused locks.
|
||||
private final Map<DeviceId, Lock> deviceLocks = Maps.newConcurrentMap();
|
||||
private final Map<DeviceId, ReadWriteLock> deviceLocks = Maps.newConcurrentMap();
|
||||
|
||||
@Activate
|
||||
public void activate() {
|
||||
@ -85,8 +86,8 @@ public class P4RuntimeControllerImpl
|
||||
checkNotNull(deviceId);
|
||||
checkNotNull(channelBuilder);
|
||||
|
||||
deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
|
||||
deviceLocks.get(deviceId).lock();
|
||||
deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
|
||||
deviceLocks.get(deviceId).writeLock().lock();
|
||||
|
||||
log.info("Creating client for {} (with internal device id {})...", deviceId, p4DeviceId);
|
||||
|
||||
@ -97,11 +98,12 @@ public class P4RuntimeControllerImpl
|
||||
return doCreateClient(deviceId, p4DeviceId, channelBuilder);
|
||||
}
|
||||
} finally {
|
||||
deviceLocks.get(deviceId).unlock();
|
||||
deviceLocks.get(deviceId).writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean doCreateClient(DeviceId deviceId, int p4DeviceId, ManagedChannelBuilder channelBuilder) {
|
||||
|
||||
GrpcChannelId channelId = GrpcChannelId.of(deviceId, "p4runtime");
|
||||
|
||||
// Channel defaults.
|
||||
@ -127,43 +129,62 @@ public class P4RuntimeControllerImpl
|
||||
@Override
|
||||
public P4RuntimeClient getClient(DeviceId deviceId) {
|
||||
|
||||
deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
|
||||
deviceLocks.get(deviceId).lock();
|
||||
deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
|
||||
deviceLocks.get(deviceId).readLock().lock();
|
||||
|
||||
try {
|
||||
return clients.get(deviceId);
|
||||
} finally {
|
||||
deviceLocks.get(deviceId).unlock();
|
||||
deviceLocks.get(deviceId).readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeClient(DeviceId deviceId) {
|
||||
|
||||
deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
|
||||
deviceLocks.get(deviceId).lock();
|
||||
deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
|
||||
deviceLocks.get(deviceId).writeLock().lock();
|
||||
|
||||
try {
|
||||
if (clients.containsKey(deviceId)) {
|
||||
clients.get(deviceId).shutdown();
|
||||
grpcController.disconnectChannel(channelIds.get(deviceId));
|
||||
clients.remove(deviceId);
|
||||
channelIds.remove(deviceId);
|
||||
}
|
||||
} finally {
|
||||
deviceLocks.get(deviceId).unlock();
|
||||
deviceLocks.get(deviceId).writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasClient(DeviceId deviceId) {
|
||||
|
||||
deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
|
||||
deviceLocks.get(deviceId).lock();
|
||||
deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
|
||||
deviceLocks.get(deviceId).readLock().lock();
|
||||
|
||||
try {
|
||||
return clients.containsKey(deviceId);
|
||||
} finally {
|
||||
deviceLocks.get(deviceId).unlock();
|
||||
deviceLocks.get(deviceId).readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReacheable(DeviceId deviceId) {
|
||||
|
||||
deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
|
||||
deviceLocks.get(deviceId).readLock().lock();
|
||||
|
||||
try {
|
||||
if (!clients.containsKey(deviceId)) {
|
||||
log.warn("No client for {}, can't check for reachability", deviceId);
|
||||
return false;
|
||||
}
|
||||
|
||||
return grpcController.isChannelOpen(channelIds.get(deviceId));
|
||||
} finally {
|
||||
deviceLocks.get(deviceId).readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -60,7 +60,9 @@ import org.onosproject.net.driver.DefaultDriverHandler;
|
||||
import org.onosproject.net.driver.Driver;
|
||||
import org.onosproject.net.driver.DriverData;
|
||||
import org.onosproject.net.driver.DriverService;
|
||||
import org.onosproject.net.pi.model.PiPipeconf;
|
||||
import org.onosproject.net.pi.model.PiPipeconfId;
|
||||
import org.onosproject.net.pi.model.PiPipelineProgrammable;
|
||||
import org.onosproject.net.pi.runtime.PiPipeconfConfig;
|
||||
import org.onosproject.net.pi.runtime.PiPipeconfService;
|
||||
import org.onosproject.net.provider.AbstractProvider;
|
||||
@ -72,7 +74,6 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -139,12 +140,12 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
|
||||
protected ScheduledExecutorService connectionExecutor
|
||||
= newScheduledThreadPool(CORE_POOL_SIZE,
|
||||
groupedThreads("onos/generaldeviceprovider-device",
|
||||
"connection-executor-%d", log));
|
||||
groupedThreads("onos/generaldeviceprovider-device",
|
||||
"connection-executor-%d", log));
|
||||
protected ScheduledExecutorService portStatsExecutor
|
||||
= newScheduledThreadPool(CORE_POOL_SIZE,
|
||||
groupedThreads("onos/generaldeviceprovider-port-stats",
|
||||
"port-stats-executor-%d", log));
|
||||
groupedThreads("onos/generaldeviceprovider-port-stats",
|
||||
"port-stats-executor-%d", log));
|
||||
|
||||
protected DeviceProviderService providerService;
|
||||
private InternalDeviceListener deviceListener = new InternalDeviceListener();
|
||||
@ -239,7 +240,7 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
modified.thenAcceptAsync(result -> {
|
||||
if (!result) {
|
||||
log.warn("Your device {} port {} status can't be changed to {}",
|
||||
deviceId, portNumber, enable);
|
||||
deviceId, portNumber, enable);
|
||||
}
|
||||
});
|
||||
|
||||
@ -251,13 +252,13 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
private DeviceHandshaker getHandshaker(DeviceId deviceId) {
|
||||
Driver driver = getDriver(deviceId);
|
||||
return getBehaviour(driver, DeviceHandshaker.class,
|
||||
new DefaultDriverData(driver, deviceId));
|
||||
new DefaultDriverData(driver, deviceId));
|
||||
}
|
||||
|
||||
private PortAdmin getPortAdmin(DeviceId deviceId) {
|
||||
Driver driver = getDriver(deviceId);
|
||||
return getBehaviour(driver, PortAdmin.class,
|
||||
new DefaultDriverData(driver, deviceId));
|
||||
new DefaultDriverData(driver, deviceId));
|
||||
|
||||
}
|
||||
|
||||
@ -267,7 +268,7 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
driver = driverService.getDriver(deviceId);
|
||||
} catch (ItemNotFoundException e) {
|
||||
log.debug("Falling back to configuration to fetch driver " +
|
||||
"for device {}", deviceId);
|
||||
"for device {}", deviceId);
|
||||
driver = driverService.getDriver(
|
||||
cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
|
||||
}
|
||||
@ -298,7 +299,7 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
|
||||
if (providerConfig == null || basicDeviceConfig == null) {
|
||||
log.error("Configuration is NULL: basic config {}, general provider " +
|
||||
"config {}", basicDeviceConfig, providerConfig);
|
||||
"config {}", basicDeviceConfig, providerConfig);
|
||||
} else {
|
||||
log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
|
||||
|
||||
@ -310,7 +311,7 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
|
||||
if (handshaker == null) {
|
||||
log.error("Device {}, with driver {} does not support DeviceHandshaker " +
|
||||
"behaviour, {}", deviceId, driver.name(), driver.behaviours());
|
||||
"behaviour, {}", deviceId, driver.name(), driver.behaviours());
|
||||
return;
|
||||
}
|
||||
//Storing deviceKeyId and all other config values
|
||||
@ -333,13 +334,13 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
ChassisId cid = new ChassisId();
|
||||
SparseAnnotations annotations = DefaultAnnotations.builder()
|
||||
.set(AnnotationKeys.PROTOCOL,
|
||||
providerConfig.protocolsInfo().keySet().toString())
|
||||
providerConfig.protocolsInfo().keySet().toString())
|
||||
.build();
|
||||
DeviceDescription description =
|
||||
new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
|
||||
driver.manufacturer(), driver.hwVersion(),
|
||||
driver.swVersion(), UNKNOWN,
|
||||
cid, false, annotations);
|
||||
driver.manufacturer(), driver.hwVersion(),
|
||||
driver.swVersion(), UNKNOWN,
|
||||
cid, false, annotations);
|
||||
//Empty list of ports
|
||||
List<PortDescription> ports = new ArrayList<>();
|
||||
|
||||
@ -354,27 +355,15 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
ports = deviceDiscovery.discoverPortDetails();
|
||||
}
|
||||
|
||||
Optional<PiPipeconfId> pipeconfId = piPipeconfService.ofDevice(deviceId);
|
||||
//Apply the Pipeline configuration and then connect the device
|
||||
if (pipeconfId.isPresent()) {
|
||||
DeviceDescription finalDescription = description;
|
||||
List<PortDescription> finalPorts = ports;
|
||||
piPipeconfService.bindToDevice(pipeconfId.get(), deviceId).whenComplete((success, ex) -> {
|
||||
if (success) {
|
||||
advertiseDevice(deviceId, finalDescription, finalPorts);
|
||||
} else {
|
||||
log.error("Can't merge driver {} with pipeconf {} for device {}, " +
|
||||
"not reporting it to the device manager",
|
||||
driver.name(), pipeconfId.get(), deviceId);
|
||||
}
|
||||
}).exceptionally(ex -> {
|
||||
throw new IllegalStateException(ex);
|
||||
});
|
||||
} else {
|
||||
//No other operation is needed, advertise the device to the core.
|
||||
advertiseDevice(deviceId, description, ports);
|
||||
if (!handlePipeconf(deviceId, driver, driverData)) {
|
||||
// Something went wrong during handling of pipeconf.
|
||||
// We already logged the error.
|
||||
handshaker.disconnect();
|
||||
return;
|
||||
}
|
||||
|
||||
advertiseDevice(deviceId, description, ports);
|
||||
|
||||
} else {
|
||||
log.warn("Can't connect to device {}", deviceId);
|
||||
}
|
||||
@ -382,6 +371,66 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
|
||||
* device can be registered to the core, false otherwise.
|
||||
*/
|
||||
private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
|
||||
|
||||
PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
|
||||
driverData);
|
||||
|
||||
if (pipelineProg == null) {
|
||||
// Device is not pipeline programmable.
|
||||
return true;
|
||||
}
|
||||
|
||||
PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
|
||||
// No pipeconf has been associated with this device.
|
||||
// Check if device driver provides a default one.
|
||||
if (pipelineProg.getDefaultPipeconf().isPresent()) {
|
||||
PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
|
||||
log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
|
||||
// Register default one if it is not.
|
||||
// TODO: this should be performed at driver loading.
|
||||
if (!piPipeconfService.getPipeconf(defaultPipeconf.id()).isPresent()) {
|
||||
piPipeconfService.register(defaultPipeconf);
|
||||
}
|
||||
return defaultPipeconf.id();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
if (pipeconfId == null) {
|
||||
log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).orElseThrow(
|
||||
() -> new IllegalStateException("Pipeconf is not registered: " + pipeconfId)
|
||||
);
|
||||
|
||||
|
||||
try {
|
||||
if (!pipelineProg.deployPipeconf(pipeconf).get()) {
|
||||
log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
|
||||
log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
|
||||
driver.name(), deviceId, pipeconfId);
|
||||
return false;
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
|
||||
providerService.deviceConnected(deviceId, description);
|
||||
providerService.updatePorts(deviceId, ports);
|
||||
@ -454,7 +503,7 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
//If we want to connect a p4runtime device with no pipeline
|
||||
if (event.config().isPresent() &&
|
||||
Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
|
||||
PIPELINE_CONFIGURABLE_PROTOCOLS)) {
|
||||
PIPELINE_CONFIGURABLE_PROTOCOLS)) {
|
||||
pipelineConfigured.add(deviceId);
|
||||
}
|
||||
deviceConfigured.add(deviceId);
|
||||
@ -526,15 +575,15 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
// be available we check and base it on the streaming API (e.g. gNMI)
|
||||
if (deviceService.getDevice(event.subject().id()).
|
||||
is(PortStatisticsDiscovery.class)) {
|
||||
portStatsExecutor.scheduleAtFixedRate(exceptionSafe(() ->
|
||||
updatePortStatistics(event.subject().id())),
|
||||
portStatsExecutor.scheduleAtFixedRate(
|
||||
exceptionSafe(() -> updatePortStatistics(event.subject().id())),
|
||||
0, PORT_STATS_PERIOD_SECONDS, TimeUnit.SECONDS);
|
||||
updatePortStatistics(event.subject().id());
|
||||
}
|
||||
|
||||
} else if (type.equals(Type.DEVICE_REMOVED)) {
|
||||
connectionExecutor.submit(exceptionSafe(() ->
|
||||
disconnectDevice(event.subject().id())));
|
||||
disconnectDevice(event.subject().id())));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -79,9 +79,13 @@ class ONOSBmv2Switch(Switch):
|
||||
"p4runtime": {
|
||||
"ip": srcIP,
|
||||
"port": self.grpcPort,
|
||||
"deviceId": self.deviceId,
|
||||
"deviceKeyId": "p4runtime:%s" % onosDeviceId
|
||||
}
|
||||
},
|
||||
"piPipeconf": {
|
||||
"piPipeconfId": ""
|
||||
},
|
||||
"basic": {
|
||||
"driver": "bmv2"
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user