[ONOS-6556] Distributed Implementation of PiPipeconfService

Change-Id: I7196ce6eee333e732c0cd5015d4d8d32ee069e27
This commit is contained in:
Andrea Campanella 2017-07-13 14:14:41 +02:00 committed by Thomas Vachuska
parent 7f8d14db82
commit f9c409a3d8
10 changed files with 447 additions and 10 deletions

View File

@ -134,7 +134,7 @@ public class DistributedDhcpRelayStoreTest {
assertEquals(record.ip6Status(), removedRecord.ip6Status()); assertEquals(record.ip6Status(), removedRecord.ip6Status());
assertEquals(record.directlyConnected(), removedRecord.directlyConnected()); assertEquals(record.directlyConnected(), removedRecord.directlyConnected());
event = recordComplete.join(); event = recordComplete.join();
assertEquals(null, event.subject()); assertEquals(record, event.subject());
assertEquals(DhcpRelayStoreEvent.Type.REMOVED, event.type()); assertEquals(DhcpRelayStoreEvent.Type.REMOVED, event.type());
recordInStore = store.getDhcpRecord(HOST_ID).orElse(null); recordInStore = store.getDhcpRecord(HOST_ID).orElse(null);
assertNull(recordInStore); assertNull(recordInStore);

View File

@ -0,0 +1,55 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.pi.runtime;
import com.google.common.annotations.Beta;
import org.onosproject.event.AbstractEvent;
import org.onosproject.net.DeviceId;
/**
* Entity that represents pipeconf to device binding events.
*/
@Beta
public class PiPipeconfDeviceMappingEvent extends AbstractEvent<PiPipeconfDeviceMappingEvent.Type, DeviceId> {
/**
* Type of pipeconf to device mapping event.
*/
public enum Type {
/**
* Individual mapping pipeconf to device added.
*/
CREATED,
/**
* Individual mapping pipeconf to device removed.
*/
REMOVED,
}
/**
* Creates an event due to one Pipeconf being mapped to a device.
*
* @param type event type
* @param deviceId the deviceId for which the pipeconf was bound or updated.
*/
public PiPipeconfDeviceMappingEvent(PiPipeconfDeviceMappingEvent.Type type, DeviceId deviceId) {
super(type, deviceId);
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.pi.runtime;
import com.google.common.annotations.Beta;
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.store.Store;
import java.util.Set;
/**
* Manages the mapping of Pipeconfs that are deployed to devices; not intended for direct use.
*/
@Beta
public interface PiPipeconfMappingStore extends Store<PiPipeconfDeviceMappingEvent, PiPipeconfMappingStoreDelegate> {
/**
* Retrieves the id of the pipeconf deployed on a given device.
*
* @param deviceId device identifier
* @return PiPipeconfId
*/
PiPipeconfId getPipeconfId(DeviceId deviceId);
/**
* Retrieves the set of devices on which the given pipeconf is applied.
*
* @param pipeconfId pipeconf identifier
* @return the set of devices that have that pipeconf applied.
*/
Set<DeviceId> getDevices(PiPipeconfId pipeconfId);
/**
* Stores or updates a binding between a device and the pipeconf deployed on it.
*
* @param deviceId deviceId
* @param pipeconfId pipeconfId
*/
void createOrUpdateBinding(DeviceId deviceId, PiPipeconfId pipeconfId);
/**
* Removes device to pipeconf binding.
*
* @param deviceId deviceId
*/
void removeBinding(DeviceId deviceId);
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.pi.runtime;
import com.google.common.annotations.Beta;
import org.onosproject.store.StoreDelegate;
/**
* Pipeconf store delegate abstraction.
*/
@Beta
public interface PiPipeconfMappingStoreDelegate extends StoreDelegate<PiPipeconfDeviceMappingEvent> {
}

View File

@ -89,4 +89,5 @@ public interface PiPipeconfService {
* @return an optional pipeconf identifier * @return an optional pipeconf identifier
*/ */
Optional<PiPipeconfId> ofDevice(DeviceId deviceId); Optional<PiPipeconfId> ofDevice(DeviceId deviceId);
} }

View File

@ -103,7 +103,7 @@ public final class TestEventuallyConsistentMap<K, V> extends EventuallyConsisten
if (result != null) { if (result != null) {
EventuallyConsistentMapEvent<K, V> removeEvent = EventuallyConsistentMapEvent<K, V> removeEvent =
new EventuallyConsistentMapEvent<>(mapName, REMOVE, new EventuallyConsistentMapEvent<>(mapName, REMOVE,
key, map.get(key)); key, result);
notifyListeners(removeEvent); notifyListeners(removeEvent);
} }
return result; return result;

View File

@ -44,6 +44,7 @@ import org.onosproject.net.driver.DriverService;
import org.onosproject.net.pi.model.PiPipeconf; import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipeconfId; import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.runtime.PiPipeconfConfig; import org.onosproject.net.pi.runtime.PiPipeconfConfig;
import org.onosproject.net.pi.runtime.PiPipeconfMappingStore;
import org.onosproject.net.pi.runtime.PiPipeconfService; import org.onosproject.net.pi.runtime.PiPipeconfService;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -83,10 +84,11 @@ public class PiPipeconfManager implements PiPipeconfService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverAdminService driverAdminService; protected DriverAdminService driverAdminService;
//TODO move to replicated map @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PiPipeconfMappingStore pipeconfMappingStore;
// Registered pipeconf are replicated through the app subsystem and registered on app activated events.
protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>(); protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
//TODO move to replicated map
protected ConcurrentHashMap<DeviceId, PiPipeconfId> devicesToPipeconf = new ConcurrentHashMap<>();
protected ExecutorService executor = protected ExecutorService executor =
Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice", Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice",
@ -120,7 +122,6 @@ public class PiPipeconfManager implements PiPipeconfService {
cfgService.removeListener(cfgListener); cfgService.removeListener(cfgListener);
cfgService.unregisterConfigFactory(factory); cfgService.unregisterConfigFactory(factory);
piPipeconfs.clear(); piPipeconfs.clear();
devicesToPipeconf.clear();
cfgService = null; cfgService = null;
driverAdminService = null; driverAdminService = null;
driverService = null; driverService = null;
@ -139,11 +140,12 @@ public class PiPipeconfManager implements PiPipeconfService {
@Override @Override
public void remove(PiPipeconfId pipeconfId) throws IllegalStateException { public void remove(PiPipeconfId pipeconfId) throws IllegalStateException {
//TODO move to the distributed mechanism
//TODO add mechanism to remove from device. //TODO add mechanism to remove from device.
if (!piPipeconfs.containsKey(pipeconfId)) { if (!piPipeconfs.containsKey(pipeconfId)) {
throw new IllegalStateException(format("Pipeconf %s is not registered", pipeconfId)); throw new IllegalStateException(format("Pipeconf %s is not registered", pipeconfId));
} }
// TODO remove the binding from the distributed Store when the lifecycle of a pipeconf is defined.
// pipeconfMappingStore.removeBindings(pipeconfId);
piPipeconfs.remove(pipeconfId); piPipeconfs.remove(pipeconfId);
} }
@ -212,7 +214,7 @@ public class PiPipeconfManager implements PiPipeconfService {
// Completable future is needed for when this method will also apply the pipeline to the device. // Completable future is needed for when this method will also apply the pipeline to the device.
// FIXME (maybe): the pipeline is currently applied by the general device provider. But we store here // FIXME (maybe): the pipeline is currently applied by the general device provider. But we store here
// the association between device and pipeconf. // the association between device and pipeconf.
devicesToPipeconf.put(deviceId, pipeconfId); pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
operationResult.complete(true); operationResult.complete(true);
} }
}); });
@ -221,9 +223,10 @@ public class PiPipeconfManager implements PiPipeconfService {
@Override @Override
public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) { public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
return Optional.ofNullable(devicesToPipeconf.get(deviceId)); return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
} }
private class PiPipeconfDriverProviderInternal implements DriverProvider { private class PiPipeconfDriverProviderInternal implements DriverProvider {
Driver driver; Driver driver;
@ -245,7 +248,7 @@ public class PiPipeconfManager implements PiPipeconfService {
if (id.id().equals("")) { if (id.id().equals("")) {
log.warn("Not adding empty pipeconfId for device {}", deviceId); log.warn("Not adding empty pipeconfId for device {}", deviceId);
} else { } else {
devicesToPipeconf.put(deviceId, pipeconfConfig.piPipeconfId()); pipeconfMappingStore.createOrUpdateBinding(deviceId, id);
} }
} }

View File

@ -0,0 +1,142 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.pi.impl;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.runtime.PiPipeconfDeviceMappingEvent;
import org.onosproject.net.pi.runtime.PiPipeconfMappingStore;
import org.onosproject.net.pi.runtime.PiPipeconfMappingStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages information of pipeconf to device binding using gossip protocol to distribute
* information.
*/
@Component(immediate = true)
@Service
public class DistributedDevicePipeconfMappingStore
extends AbstractStore<PiPipeconfDeviceMappingEvent, PiPipeconfMappingStoreDelegate>
implements PiPipeconfMappingStore {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
protected EventuallyConsistentMap<DeviceId, PiPipeconfId> deviceToPipeconf;
protected final EventuallyConsistentMapListener<DeviceId, PiPipeconfId> pipeconfListener =
new InternalPiPipeconfListener();
protected ConcurrentMap<PiPipeconfId, Set<DeviceId>> pipeconfToDevices = new ConcurrentHashMap<>();
@Activate
public void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MultiValuedTimestamp.class);
deviceToPipeconf = storageService.<DeviceId, PiPipeconfId>eventuallyConsistentMapBuilder()
.withName("onos-pipeconf-table")
.withSerializer(serializer)
.withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
deviceToPipeconf.addListener(pipeconfListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
deviceToPipeconf.removeListener(pipeconfListener);
deviceToPipeconf = null;
pipeconfToDevices = null;
log.info("Stopped");
}
@Override
public PiPipeconfId getPipeconfId(DeviceId deviceId) {
return deviceToPipeconf.get(deviceId);
}
@Override
public Set<DeviceId> getDevices(PiPipeconfId pipeconfId) {
return pipeconfToDevices.get(pipeconfId);
}
@Override
public void createOrUpdateBinding(DeviceId deviceId, PiPipeconfId pipeconfId) {
deviceToPipeconf.put(deviceId, pipeconfId);
}
@Override
public void removeBinding(DeviceId deviceId) {
deviceToPipeconf.remove(deviceId);
}
private class InternalPiPipeconfListener implements EventuallyConsistentMapListener<DeviceId, PiPipeconfId> {
@Override
public void event(EventuallyConsistentMapEvent<DeviceId, PiPipeconfId> mapEvent) {
final PiPipeconfDeviceMappingEvent.Type type;
final DeviceId deviceId = mapEvent.key();
final PiPipeconfId pipeconfId = mapEvent.value();
switch (mapEvent.type()) {
case PUT:
type = PiPipeconfDeviceMappingEvent.Type.CREATED;
pipeconfToDevices.compute(pipeconfId, (pipeconf, devices) -> {
if (devices == null) {
devices = Sets.newConcurrentHashSet();
}
devices.add(deviceId);
return devices;
});
break;
case REMOVE:
type = PiPipeconfDeviceMappingEvent.Type.REMOVED;
pipeconfToDevices.computeIfPresent(pipeconfId, (pipeconf, devices) -> {
devices.remove(deviceId);
return devices;
});
break;
default:
throw new IllegalArgumentException("Wrong event type " + mapEvent.type());
}
notifyDelegate(new PiPipeconfDeviceMappingEvent(type, deviceId));
}
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Implementation of distributed Pipeconf to device mapping store.
*/
package org.onosproject.store.pi.impl;

View File

@ -0,0 +1,122 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.pi.impl;
import com.google.common.collect.ImmutableSet;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.store.service.TestStorageService;
import static org.junit.Assert.*;
/**
* Test class for the Distributed Device to Pipeconf store.
*/
public class DistributedDevicePipeconfMappingStoreTest {
private DistributedDevicePipeconfMappingStore store;
private static final DeviceId DEVICE_ID = DeviceId.deviceId("foo:bar");
private static final PiPipeconfId PIPECONF_ID = new PiPipeconfId("foo-pipeconf");
/**
* Sets up the device key store and the storage service test harness.
*/
@Before
public void setUp() {
store = new DistributedDevicePipeconfMappingStore();
store.storageService = new TestStorageService();
store.setDelegate(event -> {
});
store.activate();
}
/**
* Test for activate.
*/
@Test
public void activate() {
assertNotNull(store.storageService);
assertTrue("Store must have delegate", store.hasDelegate());
assertTrue("No value should be in the map", store.deviceToPipeconf.isEmpty());
assertTrue("No value should be in the map", store.pipeconfToDevices.isEmpty());
}
/**
* Test for deactivate.
*/
@Test
public void deactivate() {
store.deactivate();
assertNull("Should be null", store.deviceToPipeconf);
assertNull("Should be null", store.pipeconfToDevices);
}
/**
* Test for saving the binding in eventually consistent map and in reverse map.
*/
@Test
public void createOrUpdatePipeconfToDeviceBinding() {
store.createOrUpdateBinding(DEVICE_ID, PIPECONF_ID);
assertTrue("Value should be in the map", store.deviceToPipeconf.containsKey(DEVICE_ID));
assertTrue("Value should be in the map", store.deviceToPipeconf.containsValue(PIPECONF_ID));
assertTrue("Value should be in the map", store.pipeconfToDevices.containsKey(PIPECONF_ID));
assertTrue("Value should be in the map", store.pipeconfToDevices.containsValue(ImmutableSet.of(DEVICE_ID)));
}
/**
* Test for getting the deviceId to pipeconfId binding.
*/
@Test
public void getPipeconfIdDevice() throws Exception {
clear();
createOrUpdatePipeconfToDeviceBinding();
assertEquals("Wrong PipeconfId", store.getPipeconfId(DEVICE_ID), PIPECONF_ID);
}
/**
* Test for getting the pipeconfId to devices binding.
*/
@Test
public void getDevices() {
clear();
createOrUpdatePipeconfToDeviceBinding();
assertEquals("Wrong set of DeviceIds", store.getDevices(PIPECONF_ID), ImmutableSet.of(DEVICE_ID));
}
/**
* Test for clearing binding for a given device.
*/
@Test
public void clearDeviceToPipeconfBinding() throws Exception {
clear();
createOrUpdatePipeconfToDeviceBinding();
store.removeBinding(DEVICE_ID);
assertFalse("Unexpected DeviceId in the map", store.deviceToPipeconf.containsKey(DEVICE_ID));
assertTrue("No value should be in the map", store.pipeconfToDevices.get(PIPECONF_ID).isEmpty());
}
/**
* Clears the store and revers map.
*/
private void clear() {
store.pipeconfToDevices.clear();
store.deviceToPipeconf.clear();
}
}