[ONOS-6564] Adding PiPeconf behaviours to driver for device.

Initial implementation of PiPipeconfService.
Tests for Initial implementation.

Change-Id: I9dea6fb3015788b8b61060c7f88395c3d45e6ed7
This commit is contained in:
Andrea Campanella 2017-06-26 19:06:43 +02:00
parent 20549d3a3d
commit bc112a960d
10 changed files with 767 additions and 63 deletions

View File

@ -45,7 +45,7 @@ public interface PiPipeconf {
PiPipelineModel pipelineModel();
/**
* Returns all pipeline-specific behaviours defined by this configuration.
* Returns all pipeline-specific behaviour interfaces defined by this configuration.
*
* @return a collection of behaviours
*/

View File

@ -0,0 +1,41 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.pi.runtime;
import com.google.common.annotations.Beta;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.Config;
import org.onosproject.net.pi.model.PiPipeconfId;
/**
* Configuration fot the PiPipeconf susbystem.
*/
@Beta
public class PiPipeconfConfig extends Config<DeviceId> {
public static final String PIPIPECONFID = "piPipeconfId";
@Override
public boolean isValid() {
return hasOnlyFields(PIPIPECONFID);
//TODO will reinstate after synchonization of events
//&& !piPipeconfId().id().equals("");
}
public PiPipeconfId piPipeconfId() {
return new PiPipeconfId(get(PIPIPECONFID, ""));
}
}

View File

@ -22,6 +22,7 @@ import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipeconfId;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
* A service to manage the configurations of protocol-independent pipelines.
@ -58,14 +59,16 @@ public interface PiPipeconfService {
/**
* Binds the given pipeconf to the given infrastructure device. As a result of this method call,
* if the given pipeconf exposes any pipeline-specific behaviours, those will be merged to the
* device's driver.
* device's driver. Returns a completable future to provide async methods with a boolean if the merge
* of the drivers succeeded.
*
* @param deviceId a device identifier
* @param pipeconf a pipeconf identifier
* @param pipeconfId a pipeconf identifier
* @return a CompletableFuture with a boolean, true if operation succeeded
*/
// TODO: This service doesn't make any effort in deploying the configuration to the device.
// Someone else should do that.
void bindToDevice(PiPipeconfId pipeconf, DeviceId deviceId);
CompletableFuture<Boolean> bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId);
/**
* Returns the pipeconf identifier currently associated with the given device identifier, if

View File

@ -0,0 +1,251 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.pi.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.ItemNotFoundException;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.BasicDeviceConfig;
import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.driver.Behaviour;
import org.onosproject.net.driver.DefaultDriver;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverAdminService;
import org.onosproject.net.driver.DriverProvider;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.runtime.PiPipeconfConfig;
import org.onosproject.net.pi.runtime.PiPipeconfService;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of the PiPipeconfService.
*/
@Component(immediate = true)
@Service
public class PiPipeconfServiceImpl implements PiPipeconfService {
private final Logger log = getLogger(getClass());
private static final String DRIVER = "driver";
private static final String CFG_SCHEME = "piPipeconf";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigRegistry cfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverAdminService driverAdminService;
//TODO move to replicated map
protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
//TODO move to replicated map
protected ConcurrentHashMap<DeviceId, PiPipeconfId> devicesToPipeconf = new ConcurrentHashMap<>();
protected ExecutorService executor =
Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice",
"pipeline-to-device-%d", log));
protected final ConfigFactory factory =
new ConfigFactory<DeviceId, PiPipeconfConfig>(
SubjectFactories.DEVICE_SUBJECT_FACTORY,
PiPipeconfConfig.class, CFG_SCHEME) {
@Override
public PiPipeconfConfig createConfig() {
return new PiPipeconfConfig();
}
};
protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
@Activate
public void activate() {
cfgService.registerConfigFactory(factory);
cfgService.addListener(cfgListener);
cfgService.getSubjects(DeviceId.class, PiPipeconfConfig.class)
.forEach(this::addPipeconfFromCfg);
log.info("Started");
}
@Deactivate
public void deactivate() {
executor.shutdown();
cfgService.removeListener(cfgListener);
cfgService.unregisterConfigFactory(factory);
piPipeconfs.clear();
devicesToPipeconf.clear();
cfgService = null;
driverAdminService = null;
driverService = null;
log.info("Stopped");
}
@Override
public void register(PiPipeconf pipeconf) throws IllegalStateException {
log.warn("Currently using local maps, needs to be moved to a distributed store");
piPipeconfs.put(pipeconf.id(), pipeconf);
}
@Override
public Iterable<PiPipeconf> getPipeconfs() {
throw new UnsupportedOperationException("Currently unsupported");
}
@Override
public Optional<PiPipeconf> getPipeconf(PiPipeconfId id) {
return Optional.ofNullable(piPipeconfs.get(id));
}
@Override
public CompletableFuture<Boolean> bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId) {
CompletableFuture<Boolean> operationResult = new CompletableFuture<>();
executor.execute(() -> {
BasicDeviceConfig basicDeviceConfig =
cfgService.getConfig(deviceId, BasicDeviceConfig.class);
Driver baseDriver = driverService.getDriver(basicDeviceConfig.driver());
String completeDriverName = baseDriver.name() + ":" + pipeconfId;
PiPipeconf piPipeconf = piPipeconfs.get(pipeconfId);
if (piPipeconf == null) {
log.warn("Pipeconf {} is not present", pipeconfId);
operationResult.complete(false);
} else {
//if driver exists already we don't create a new one.
//needs to be done via exception catching due to DriverRegistry throwing it on a null return from
//the driver map.
try {
driverService.getDriver(completeDriverName);
} catch (ItemNotFoundException e) {
log.debug("First time pipeconf {} is used with base driver {}, merging the two",
pipeconfId, baseDriver);
//extract the behaviours from the pipipeconf.
Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours = new HashMap<>();
piPipeconf.behaviours().forEach(b -> {
behaviours.put(b, piPipeconf.implementation(b).get());
});
Driver piPipeconfDriver = new DefaultDriver(completeDriverName, baseDriver.parents(),
baseDriver.manufacturer(), baseDriver.hwVersion(), baseDriver.swVersion(),
behaviours, new HashMap<>());
//we take the base driver created with the behaviours of the PiPeconf and
// merge it with the base driver that was assigned to the device
Driver completeDriver = piPipeconfDriver.merge(baseDriver);
//This might lead to explosion of number of providers in the core,
// due to 1:1:1 pipeconf:driver:provider maybe find better way
DriverProvider provider = new PiPipeconfDriverProviderInternal(completeDriver);
//we register to the dirver susbystem the driver provider containing the merged driver
driverAdminService.registerProvider(provider);
}
//Changing the configuration for the device to enforce the full driver with pipipeconf
// and base behaviours
ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
newCfg = newCfg.put(DRIVER, completeDriverName);
ObjectMapper mapper = new ObjectMapper();
JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
//Completable future is needed for when this method will also apply the pipeline to the device.
operationResult.complete(true);
}
});
return operationResult;
}
@Override
public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
return Optional.ofNullable(devicesToPipeconf.get(deviceId));
}
private class PiPipeconfDriverProviderInternal implements DriverProvider {
Driver driver;
PiPipeconfDriverProviderInternal(Driver driver) {
this.driver = driver;
}
@Override
public Set<Driver> getDrivers() {
return ImmutableSet.of(driver);
}
}
private void addPipeconfFromCfg(DeviceId deviceId) {
PiPipeconfConfig pipeconfConfig =
cfgService.getConfig(deviceId, PiPipeconfConfig.class);
PiPipeconfId id = pipeconfConfig.piPipeconfId();
if (id.id().equals("")) {
log.warn("Not adding empty pipeconfId for device {}", deviceId);
} else {
devicesToPipeconf.put(deviceId, pipeconfConfig.piPipeconfId());
}
}
/**
* Listener for configuration events.
*/
private class InternalNetworkConfigListener implements NetworkConfigListener {
@Override
public void event(NetworkConfigEvent event) {
DeviceId deviceId = (DeviceId) event.subject();
addPipeconfFromCfg(deviceId);
}
@Override
public boolean isRelevant(NetworkConfigEvent event) {
return event.configClass().equals(PiPipeconfConfig.class) &&
(event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
}
}
}

View File

@ -43,7 +43,7 @@ public class MockPipeconf implements PiPipeconf {
private final PiPipeconfId id;
private final PiPipelineModel pipelineModel;
private final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours;
protected final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours;
public MockPipeconf() throws IOException {
this.id = new PiPipeconfId(PIPECONF_ID);
@ -70,7 +70,7 @@ public class MockPipeconf implements PiPipeconf {
@Override
public Collection<Class<? extends Behaviour>> behaviours() {
return behaviours.values();
return behaviours.keySet();
}
@Override

View File

@ -0,0 +1,299 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.pi.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.junit.Before;
import org.junit.Test;
import org.onlab.util.ItemNotFoundException;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.behaviour.PipelinerAdapter;
import org.onosproject.net.config.Config;
import org.onosproject.net.config.ConfigApplyDelegate;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.NetworkConfigRegistryAdapter;
import org.onosproject.net.config.basics.BasicDeviceConfig;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
import org.onosproject.net.driver.Behaviour;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverAdapter;
import org.onosproject.net.driver.DriverAdminService;
import org.onosproject.net.driver.DriverAdminServiceAdapter;
import org.onosproject.net.driver.DriverProvider;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.driver.DriverServiceAdapter;
import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.model.PiPipelineInterpreter;
import org.onosproject.net.pi.runtime.PiPipeconfConfig;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Unit Test Class for PiPipeconfServiceImpl.
*/
public class PiPipeconfServiceImplTest {
private static final DeviceId DEVICE_ID = DeviceId.deviceId("test:test");
private static final String BASE_DRIVER = "baseDriver";
private static final Set<Class<? extends Behaviour>> EXPECTED_BEHAVIOURS =
ImmutableSet.of(DeviceDescriptionDiscovery.class, Pipeliner.class, PiPipelineInterpreter.class);
//Mock util sets and classes
private final NetworkConfigRegistry cfgService = new MockNetworkConfigRegistry();
private final DriverService driverService = new MockDriverService();
private final DriverAdminService driverAdminService = new MockDriverAdminService();
private Driver baseDriver = new MockDriver();
private String completeDriverName;
private final Set<ConfigFactory> cfgFactories = new HashSet<>();
private final Set<NetworkConfigListener> netCfgListeners = new HashSet<>();
private final Set<DriverProvider> providers = new HashSet<>();
private final PiPipeconfConfig piPipeconfConfig = new PiPipeconfConfig();
private final InputStream jsonStream = PiPipeconfServiceImplTest.class
.getResourceAsStream("/org/onosproject/net/pi/impl/piPipeconfId.json");
private final BasicDeviceConfig basicDeviceConfig = new BasicDeviceConfig();
private final InputStream jsonStreamBasic = PiPipeconfServiceImplTest.class
.getResourceAsStream("/org/onosproject/net/pi/impl/basic.json");
//Services
private PiPipeconfServiceImpl piPipeconfService;
private MockPipeconf piPipeconf;
@Before
public void setUp() throws IOException {
piPipeconfService = new PiPipeconfServiceImpl();
piPipeconf = new MockPipeconf();
completeDriverName = BASE_DRIVER + ":" + piPipeconf.id();
piPipeconf.behaviours.put(Pipeliner.class, PipelinerAdapter.class);
piPipeconfService.cfgService = cfgService;
piPipeconfService.driverService = driverService;
piPipeconfService.driverAdminService = driverAdminService;
String key = "piPipeconf";
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(jsonStream);
ConfigApplyDelegate delegate = new MockDelegate();
piPipeconfConfig.init(DEVICE_ID, key, jsonNode, mapper, delegate);
String keyBasic = "basic";
JsonNode jsonNodeBasic = mapper.readTree(jsonStreamBasic);
basicDeviceConfig.init(DEVICE_ID, keyBasic, jsonNodeBasic, mapper, delegate);
piPipeconfService.activate();
}
@Test
public void activate() {
assertEquals("Incorrect driver service", driverService, piPipeconfService.driverService);
assertEquals("Incorrect driverAdminService service", driverAdminService, piPipeconfService.driverAdminService);
assertEquals("Incorrect configuration service", cfgService, piPipeconfService.cfgService);
assertTrue("Incorrect config factory", cfgFactories.contains(piPipeconfService.factory));
assertTrue("Incorrect network configuration listener", netCfgListeners.contains(piPipeconfService.cfgListener));
}
@Test
public void deactivate() {
piPipeconfService.deactivate();
assertEquals("Incorrect driver service", null, piPipeconfService.driverService);
assertEquals("Incorrect driverAdminService service", null, piPipeconfService.driverAdminService);
assertEquals("Incorrect configuration service", null, piPipeconfService.cfgService);
assertFalse("Config factory should be unregistered", cfgFactories.contains(piPipeconfService.factory));
assertFalse("Network configuration listener should be unregistered",
netCfgListeners.contains(piPipeconfService.cfgListener));
}
@Test
public void register() {
piPipeconfService.register(piPipeconf);
assertTrue("PiPipeconf should be registered", piPipeconfService.piPipeconfs.contains(piPipeconf));
}
@Test
public void getPipeconf() {
piPipeconfService.register(piPipeconf);
assertEquals("Returned PiPipeconf is not correct", piPipeconf,
piPipeconfService.getPipeconf(piPipeconf.id()).get());
}
@Test
public void bindToDevice() throws Exception {
PiPipeconfId piPipeconfId = cfgService.getConfig(DEVICE_ID, PiPipeconfConfig.class).piPipeconfId();
assertEquals(piPipeconf.id(), piPipeconfId);
String baseDriverName = cfgService.getConfig(DEVICE_ID, BasicDeviceConfig.class).driver();
assertEquals(BASE_DRIVER, baseDriverName);
piPipeconfService.register(piPipeconf);
assertEquals("Returned PiPipeconf is not correct", piPipeconf,
piPipeconfService.getPipeconf(piPipeconf.id()).get());
piPipeconfService.bindToDevice(piPipeconfId, DEVICE_ID).whenComplete((booleanResult, ex) -> {
//we assume that the provider is 1 and that it contains 1 driver
//we also assume that everything after driverAdminService.registerProvider(provider); has been tested.
assertTrue("Provider should be registered", providers.size() != 0);
assertTrue("Boolean Result of method should be True", booleanResult);
providers.forEach(p -> {
assertTrue("Provider should contain a driver", p.getDrivers().size() != 0);
p.getDrivers().forEach(driver -> {
assertEquals("The driver has wrong name", driver.name(), completeDriverName);
assertEquals("The driver contains wrong behaviours", EXPECTED_BEHAVIOURS, driver.behaviours());
});
});
}).exceptionally(ex -> {
throw new IllegalStateException(ex);
});
}
private class MockNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
@Override
public void registerConfigFactory(ConfigFactory configFactory) {
cfgFactories.add(configFactory);
}
@Override
public void unregisterConfigFactory(ConfigFactory configFactory) {
cfgFactories.remove(configFactory);
}
@Override
public void addListener(NetworkConfigListener listener) {
netCfgListeners.add(listener);
}
@Override
public void removeListener(NetworkConfigListener listener) {
netCfgListeners.remove(listener);
}
@Override
public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
DeviceId did = (DeviceId) subject;
if (configClass.equals(PiPipeconfConfig.class)
&& did.equals(DEVICE_ID)) {
return (C) piPipeconfConfig;
} else if (configClass.equals(BasicDeviceConfig.class)
&& did.equals(DEVICE_ID)) {
return (C) basicDeviceConfig;
}
return null;
}
}
private class MockDriverService extends DriverServiceAdapter {
@Override
public Driver getDriver(String driverName) {
if (driverName.equals(BASE_DRIVER)) {
return baseDriver;
}
throw new ItemNotFoundException("Driver not found");
}
}
private class MockDriverAdminService extends DriverAdminServiceAdapter {
@Override
public void registerProvider(DriverProvider provider) {
providers.add(provider);
}
}
private class MockDelegate implements ConfigApplyDelegate {
@Override
public void onApply(Config configFile) {
}
}
private class MockDriver extends DriverAdapter {
@Override
public List<Driver> parents() {
return ImmutableList.of();
}
@Override
public String manufacturer() {
return "On.Lab";
}
@Override
public String hwVersion() {
return "testHW";
}
@Override
public Class<? extends Behaviour> implementation(Class<? extends Behaviour> behaviour) {
return MockDeviceDescriptionDiscovery.class;
}
@Override
public Map<String, String> properties() {
return new HashMap<>();
}
@Override
public Set<Class<? extends Behaviour>> behaviours() {
return ImmutableSet.of(DeviceDescriptionDiscovery.class);
}
@Override
public String swVersion() {
return "testSW";
}
@Override
public String name() {
return BASE_DRIVER;
}
}
private class MockDeviceDescriptionDiscovery extends AbstractHandlerBehaviour
implements DeviceDescriptionDiscovery {
@Override
public DeviceDescription discoverDeviceDetails() {
return null;
}
@Override
public List<PortDescription> discoverPortDetails() {
return null;
}
}
}

View File

@ -0,0 +1,3 @@
{
"driver": "baseDriver"
}

View File

@ -0,0 +1,3 @@
{
"piPipeconfId": "org.project.pipeconf.default"
}

View File

@ -17,6 +17,8 @@
package org.onosproject.provider.general.device.impl;
import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@ -58,7 +60,9 @@ import org.onosproject.net.driver.DefaultDriverHandler;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverData;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.key.DeviceKeyAdminService;
import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.runtime.PiPipeconfConfig;
import org.onosproject.net.pi.runtime.PiPipeconfService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
@ -66,12 +70,19 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
@ -87,6 +98,7 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
public class GeneralDeviceProvider extends AbstractProvider
implements DeviceProvider {
public static final String DRIVER = "driver";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@ -105,7 +117,7 @@ public class GeneralDeviceProvider extends AbstractProvider
protected DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceKeyAdminService deviceKeyAdminService;
protected PiPipeconfService piPipeconfService;
protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
protected static final String URI_SCHEME = "device";
@ -114,6 +126,15 @@ public class GeneralDeviceProvider extends AbstractProvider
private static final int CORE_POOL_SIZE = 10;
private static final String UNKNOWN = "unknown";
private static final int PORT_STATS_PERIOD_SECONDS = 10;
//FIXME this will be removed when the configuration is synced at the source.
private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
//FIXME to be removed when netcfg will issue device events in a bundle or
//ensures all configuration needed is present
private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
protected ScheduledExecutorService connectionExecutor
@ -279,7 +300,7 @@ public class GeneralDeviceProvider extends AbstractProvider
log.error("Configuration is NULL: basic config {}, general provider " +
"config {}", basicDeviceConfig, providerConfig);
} else {
log.info("Connecting to device {}", deviceId);
log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
Driver driver = driverService.getDriver(basicDeviceConfig.driver());
DriverData driverData = new DefaultDriverData(driver, deviceId);
@ -287,8 +308,11 @@ public class GeneralDeviceProvider extends AbstractProvider
DeviceHandshaker handshaker =
getBehaviour(driver, DeviceHandshaker.class, driverData);
if (handshaker != null) {
if (handshaker == null) {
log.error("Device {}, with driver {} does not support DeviceHandshaker " +
"behaviour, {}", deviceId, driver.name(), driver.behaviours());
return;
}
//Storing deviceKeyId and all other config values
// as data in the driver with protocol_<info>
// name as the key. e.g protocol_ip
@ -329,18 +353,38 @@ public class GeneralDeviceProvider extends AbstractProvider
}
ports = deviceDiscovery.discoverPortDetails();
}
providerService.deviceConnected(deviceId, description);
providerService.updatePorts(deviceId, ports);
Optional<PiPipeconfId> pipeconfId = piPipeconfService.ofDevice(deviceId);
//Apply the Pipeline configuration and then connect the device
if (pipeconfId.isPresent()) {
DeviceDescription finalDescription = description;
List<PortDescription> finalPorts = ports;
piPipeconfService.bindToDevice(pipeconfId.get(), deviceId).whenComplete((success, ex) -> {
if (success) {
advertiseDevice(deviceId, finalDescription, finalPorts);
} else {
log.error("Can't merge driver {} with pipeconf {} for device {}, " +
"not reporting it to the device manager",
driver.name(), pipeconfId.get(), deviceId);
}
}).exceptionally(ex -> {
throw new IllegalStateException(ex);
});
} else {
//No other operation is needed, advertise the device to the core.
advertiseDevice(deviceId, description, ports);
}
} else {
log.warn("Can't connect to device {}", deviceId);
}
});
} else {
log.error("Device {}, with driver {} does not support DeviceHandshaker " +
"behaviour, {}", deviceId, driver.name(), driver.behaviours());
}
}
private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
providerService.deviceConnected(deviceId, description);
providerService.updatePorts(deviceId, ports);
}
private void disconnectDevice(DeviceId deviceId) {
@ -396,21 +440,78 @@ public class GeneralDeviceProvider extends AbstractProvider
log.debug("{} is not my scheme, skipping", deviceId);
return;
}
if (deviceService.getDevice(deviceId) == null || !deviceService.isAvailable(deviceId)) {
connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
} else {
if (deviceService.getDevice(deviceId) != null || deviceService.isAvailable(deviceId)) {
log.info("Device {} is already connected to ONOS and is available", deviceId);
return;
}
//FIXME to be removed when netcfg will issue device events in a bundle or
// ensure all configuration needed is present
Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
lock.lock();
try {
if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
//FIXME we currently assume that p4runtime devices are pipeline configurable.
//If we want to connect a p4runtime device with no pipeline
if (event.config().isPresent() &&
Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
PIPELINE_CONFIGURABLE_PROTOCOLS)) {
pipelineConfigured.add(deviceId);
}
deviceConfigured.add(deviceId);
} else if (event.configClass().equals(BasicDeviceConfig.class)) {
if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
//TODO add check for pipeline and add it to the pipeline list if no
// p4runtime is present.
driverConfigured.add(deviceId);
}
} else if (event.configClass().equals(PiPipeconfConfig.class)) {
if (event.config().isPresent()
&& event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
pipelineConfigured.add(deviceId);
}
}
//if the device has no "pipeline configurable protocol it will be present
// in the pipelineConfigured
if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
&& pipelineConfigured.contains(deviceId)) {
checkAndSubmitDeviceTask(deviceId);
} else {
if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
log.debug("Waiting for pipeline configuration for device {}", deviceId);
} else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
log.debug("Waiting for device configuration for device {}", deviceId);
} else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
log.debug("Waiting for driver configuration for device {}", deviceId);
} else if (driverConfigured.contains(deviceId)) {
log.debug("Only driver configuration for device {}", deviceId);
} else if (deviceConfigured.contains(deviceId)) {
log.debug("Only device configuration for device {}", deviceId);
}
}
} finally {
lock.unlock();
}
}
@Override
public boolean isRelevant(NetworkConfigEvent event) {
return event.configClass().equals(GeneralProviderDeviceConfig.class) &&
return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
event.configClass().equals(BasicDeviceConfig.class) ||
event.configClass().equals(PiPipeconfConfig.class)) &&
(event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
}
}
private void checkAndSubmitDeviceTask(DeviceId deviceId) {
connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
//FIXME this will be removed when configuration is synced.
deviceConfigured.remove(deviceId);
driverConfigured.remove(deviceId);
pipelineConfigured.remove(deviceId);
}
/**
* Listener for core device events.
*/

View File

@ -13,6 +13,9 @@
"deviceKeyId": "p4runtime:device:identifier"
}
},
"piPipeconf":{
"piPipeconfId": "pipipeconfTest"
},
"basic": {
"driver": "bmv2"
}