diff --git a/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/store/DistributedDhcpRelayStoreTest.java b/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/store/DistributedDhcpRelayStoreTest.java index a010ddc1cb..81491d0aa1 100644 --- a/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/store/DistributedDhcpRelayStoreTest.java +++ b/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/store/DistributedDhcpRelayStoreTest.java @@ -134,7 +134,7 @@ public class DistributedDhcpRelayStoreTest { assertEquals(record.ip6Status(), removedRecord.ip6Status()); assertEquals(record.directlyConnected(), removedRecord.directlyConnected()); event = recordComplete.join(); - assertEquals(null, event.subject()); + assertEquals(record, event.subject()); assertEquals(DhcpRelayStoreEvent.Type.REMOVED, event.type()); recordInStore = store.getDhcpRecord(HOST_ID).orElse(null); assertNull(recordInStore); diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfDeviceMappingEvent.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfDeviceMappingEvent.java new file mode 100644 index 0000000000..540967c741 --- /dev/null +++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfDeviceMappingEvent.java @@ -0,0 +1,55 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.net.pi.runtime; + +import com.google.common.annotations.Beta; +import org.onosproject.event.AbstractEvent; +import org.onosproject.net.DeviceId; + +/** + * Entity that represents pipeconf to device binding events. + */ +@Beta +public class PiPipeconfDeviceMappingEvent extends AbstractEvent { + + /** + * Type of pipeconf to device mapping event. + */ + public enum Type { + + /** + * Individual mapping pipeconf to device added. + */ + CREATED, + + /** + * Individual mapping pipeconf to device removed. + */ + REMOVED, + } + + /** + * Creates an event due to one Pipeconf being mapped to a device. + * + * @param type event type + * @param deviceId the deviceId for which the pipeconf was bound or updated. + */ + public PiPipeconfDeviceMappingEvent(PiPipeconfDeviceMappingEvent.Type type, DeviceId deviceId) { + super(type, deviceId); + } + +} diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStore.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStore.java new file mode 100644 index 0000000000..76438b2baa --- /dev/null +++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStore.java @@ -0,0 +1,66 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.onosproject.net.pi.runtime; + +import com.google.common.annotations.Beta; +import org.onosproject.net.DeviceId; +import org.onosproject.net.pi.model.PiPipeconfId; +import org.onosproject.store.Store; + +import java.util.Set; + +/** + * Manages the mapping of Pipeconfs that are deployed to devices; not intended for direct use. + */ +@Beta +public interface PiPipeconfMappingStore extends Store { + + /** + * Retrieves the id of the pipeconf deployed on a given device. + * + * @param deviceId device identifier + * @return PiPipeconfId + */ + PiPipeconfId getPipeconfId(DeviceId deviceId); + + /** + * Retrieves the set of devices on which the given pipeconf is applied. + * + * @param pipeconfId pipeconf identifier + * @return the set of devices that have that pipeconf applied. + */ + Set getDevices(PiPipeconfId pipeconfId); + + /** + * Stores or updates a binding between a device and the pipeconf deployed on it. + * + * @param deviceId deviceId + * @param pipeconfId pipeconfId + */ + + void createOrUpdateBinding(DeviceId deviceId, PiPipeconfId pipeconfId); + + /** + * Removes device to pipeconf binding. + * + * @param deviceId deviceId + */ + + void removeBinding(DeviceId deviceId); + +} diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStoreDelegate.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStoreDelegate.java new file mode 100644 index 0000000000..c2fbf17cfc --- /dev/null +++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStoreDelegate.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.onosproject.net.pi.runtime; + +import com.google.common.annotations.Beta; +import org.onosproject.store.StoreDelegate; + +/** + * Pipeconf store delegate abstraction. + */ +@Beta +public interface PiPipeconfMappingStoreDelegate extends StoreDelegate { +} diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java index c43be6dc44..79bc624b85 100644 --- a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java +++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java @@ -89,4 +89,5 @@ public interface PiPipeconfService { * @return an optional pipeconf identifier */ Optional ofDevice(DeviceId deviceId); + } diff --git a/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java b/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java index b28356d6ef..4a8dc478b5 100644 --- a/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java +++ b/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java @@ -103,7 +103,7 @@ public final class TestEventuallyConsistentMap extends EventuallyConsisten if (result != null) { EventuallyConsistentMapEvent removeEvent = new EventuallyConsistentMapEvent<>(mapName, REMOVE, - key, map.get(key)); + key, result); notifyListeners(removeEvent); } return result; diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java index 9072f3f275..a0bf93e9ff 100644 --- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java +++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java @@ -44,6 +44,7 @@ import org.onosproject.net.driver.DriverService; import org.onosproject.net.pi.model.PiPipeconf; import org.onosproject.net.pi.model.PiPipeconfId; import org.onosproject.net.pi.runtime.PiPipeconfConfig; +import org.onosproject.net.pi.runtime.PiPipeconfMappingStore; import org.onosproject.net.pi.runtime.PiPipeconfService; import org.slf4j.Logger; @@ -83,10 +84,11 @@ public class PiPipeconfManager implements PiPipeconfService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DriverAdminService driverAdminService; - //TODO move to replicated map + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected PiPipeconfMappingStore pipeconfMappingStore; + + // Registered pipeconf are replicated through the app subsystem and registered on app activated events. protected ConcurrentHashMap piPipeconfs = new ConcurrentHashMap<>(); - //TODO move to replicated map - protected ConcurrentHashMap devicesToPipeconf = new ConcurrentHashMap<>(); protected ExecutorService executor = Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice", @@ -120,7 +122,6 @@ public class PiPipeconfManager implements PiPipeconfService { cfgService.removeListener(cfgListener); cfgService.unregisterConfigFactory(factory); piPipeconfs.clear(); - devicesToPipeconf.clear(); cfgService = null; driverAdminService = null; driverService = null; @@ -139,11 +140,12 @@ public class PiPipeconfManager implements PiPipeconfService { @Override public void remove(PiPipeconfId pipeconfId) throws IllegalStateException { - //TODO move to the distributed mechanism //TODO add mechanism to remove from device. if (!piPipeconfs.containsKey(pipeconfId)) { throw new IllegalStateException(format("Pipeconf %s is not registered", pipeconfId)); } + // TODO remove the binding from the distributed Store when the lifecycle of a pipeconf is defined. + // pipeconfMappingStore.removeBindings(pipeconfId); piPipeconfs.remove(pipeconfId); } @@ -212,7 +214,7 @@ public class PiPipeconfManager implements PiPipeconfService { // Completable future is needed for when this method will also apply the pipeline to the device. // FIXME (maybe): the pipeline is currently applied by the general device provider. But we store here // the association between device and pipeconf. - devicesToPipeconf.put(deviceId, pipeconfId); + pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId); operationResult.complete(true); } }); @@ -221,9 +223,10 @@ public class PiPipeconfManager implements PiPipeconfService { @Override public Optional ofDevice(DeviceId deviceId) { - return Optional.ofNullable(devicesToPipeconf.get(deviceId)); + return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId)); } + private class PiPipeconfDriverProviderInternal implements DriverProvider { Driver driver; @@ -245,7 +248,7 @@ public class PiPipeconfManager implements PiPipeconfService { if (id.id().equals("")) { log.warn("Not adding empty pipeconfId for device {}", deviceId); } else { - devicesToPipeconf.put(deviceId, pipeconfConfig.piPipeconfId()); + pipeconfMappingStore.createOrUpdateBinding(deviceId, id); } } diff --git a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java new file mode 100644 index 0000000000..d07429aaba --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java @@ -0,0 +1,142 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.store.pi.impl; + +import com.google.common.collect.Sets; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.KryoNamespace; +import org.onosproject.net.DeviceId; +import org.onosproject.net.pi.model.PiPipeconfId; +import org.onosproject.net.pi.runtime.PiPipeconfDeviceMappingEvent; +import org.onosproject.net.pi.runtime.PiPipeconfMappingStore; +import org.onosproject.net.pi.runtime.PiPipeconfMappingStoreDelegate; +import org.onosproject.store.AbstractStore; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.EventuallyConsistentMapEvent; +import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.MultiValuedTimestamp; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.WallClockTimestamp; +import org.slf4j.Logger; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Manages information of pipeconf to device binding using gossip protocol to distribute + * information. + */ +@Component(immediate = true) +@Service +public class DistributedDevicePipeconfMappingStore + extends AbstractStore + implements PiPipeconfMappingStore { + + private final Logger log = getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + protected EventuallyConsistentMap deviceToPipeconf; + + protected final EventuallyConsistentMapListener pipeconfListener = + new InternalPiPipeconfListener(); + + protected ConcurrentMap> pipeconfToDevices = new ConcurrentHashMap<>(); + + @Activate + public void activate() { + KryoNamespace.Builder serializer = KryoNamespace.newBuilder() + .register(KryoNamespaces.API) + .register(MultiValuedTimestamp.class); + deviceToPipeconf = storageService.eventuallyConsistentMapBuilder() + .withName("onos-pipeconf-table") + .withSerializer(serializer) + .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); + deviceToPipeconf.addListener(pipeconfListener); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + deviceToPipeconf.removeListener(pipeconfListener); + deviceToPipeconf = null; + pipeconfToDevices = null; + log.info("Stopped"); + } + + @Override + public PiPipeconfId getPipeconfId(DeviceId deviceId) { + return deviceToPipeconf.get(deviceId); + } + + @Override + public Set getDevices(PiPipeconfId pipeconfId) { + return pipeconfToDevices.get(pipeconfId); + } + + @Override + public void createOrUpdateBinding(DeviceId deviceId, PiPipeconfId pipeconfId) { + deviceToPipeconf.put(deviceId, pipeconfId); + } + + @Override + public void removeBinding(DeviceId deviceId) { + deviceToPipeconf.remove(deviceId); + } + + private class InternalPiPipeconfListener implements EventuallyConsistentMapListener { + + @Override + public void event(EventuallyConsistentMapEvent mapEvent) { + final PiPipeconfDeviceMappingEvent.Type type; + final DeviceId deviceId = mapEvent.key(); + final PiPipeconfId pipeconfId = mapEvent.value(); + switch (mapEvent.type()) { + case PUT: + type = PiPipeconfDeviceMappingEvent.Type.CREATED; + pipeconfToDevices.compute(pipeconfId, (pipeconf, devices) -> { + if (devices == null) { + devices = Sets.newConcurrentHashSet(); + } + devices.add(deviceId); + return devices; + }); + break; + case REMOVE: + type = PiPipeconfDeviceMappingEvent.Type.REMOVED; + pipeconfToDevices.computeIfPresent(pipeconfId, (pipeconf, devices) -> { + devices.remove(deviceId); + return devices; + }); + break; + default: + throw new IllegalArgumentException("Wrong event type " + mapEvent.type()); + } + notifyDelegate(new PiPipeconfDeviceMappingEvent(type, deviceId)); + } + } +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/package-info.java new file mode 100644 index 0000000000..d336455995 --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of distributed Pipeconf to device mapping store. + */ +package org.onosproject.store.pi.impl; diff --git a/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java new file mode 100644 index 0000000000..83b997d794 --- /dev/null +++ b/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.pi.impl; + +import com.google.common.collect.ImmutableSet; +import org.junit.Before; +import org.junit.Test; +import org.onosproject.net.DeviceId; +import org.onosproject.net.pi.model.PiPipeconfId; +import org.onosproject.store.service.TestStorageService; + +import static org.junit.Assert.*; + +/** + * Test class for the Distributed Device to Pipeconf store. + */ +public class DistributedDevicePipeconfMappingStoreTest { + + private DistributedDevicePipeconfMappingStore store; + private static final DeviceId DEVICE_ID = DeviceId.deviceId("foo:bar"); + private static final PiPipeconfId PIPECONF_ID = new PiPipeconfId("foo-pipeconf"); + + /** + * Sets up the device key store and the storage service test harness. + */ + @Before + public void setUp() { + store = new DistributedDevicePipeconfMappingStore(); + store.storageService = new TestStorageService(); + store.setDelegate(event -> { + }); + store.activate(); + } + + /** + * Test for activate. + */ + @Test + public void activate() { + assertNotNull(store.storageService); + assertTrue("Store must have delegate", store.hasDelegate()); + assertTrue("No value should be in the map", store.deviceToPipeconf.isEmpty()); + assertTrue("No value should be in the map", store.pipeconfToDevices.isEmpty()); + } + + /** + * Test for deactivate. + */ + @Test + public void deactivate() { + store.deactivate(); + assertNull("Should be null", store.deviceToPipeconf); + assertNull("Should be null", store.pipeconfToDevices); + } + + /** + * Test for saving the binding in eventually consistent map and in reverse map. + */ + @Test + public void createOrUpdatePipeconfToDeviceBinding() { + store.createOrUpdateBinding(DEVICE_ID, PIPECONF_ID); + assertTrue("Value should be in the map", store.deviceToPipeconf.containsKey(DEVICE_ID)); + assertTrue("Value should be in the map", store.deviceToPipeconf.containsValue(PIPECONF_ID)); + assertTrue("Value should be in the map", store.pipeconfToDevices.containsKey(PIPECONF_ID)); + assertTrue("Value should be in the map", store.pipeconfToDevices.containsValue(ImmutableSet.of(DEVICE_ID))); + } + + /** + * Test for getting the deviceId to pipeconfId binding. + */ + @Test + public void getPipeconfIdDevice() throws Exception { + clear(); + createOrUpdatePipeconfToDeviceBinding(); + assertEquals("Wrong PipeconfId", store.getPipeconfId(DEVICE_ID), PIPECONF_ID); + } + + /** + * Test for getting the pipeconfId to devices binding. + */ + @Test + public void getDevices() { + clear(); + createOrUpdatePipeconfToDeviceBinding(); + assertEquals("Wrong set of DeviceIds", store.getDevices(PIPECONF_ID), ImmutableSet.of(DEVICE_ID)); + + } + + /** + * Test for clearing binding for a given device. + */ + @Test + public void clearDeviceToPipeconfBinding() throws Exception { + clear(); + createOrUpdatePipeconfToDeviceBinding(); + store.removeBinding(DEVICE_ID); + assertFalse("Unexpected DeviceId in the map", store.deviceToPipeconf.containsKey(DEVICE_ID)); + assertTrue("No value should be in the map", store.pipeconfToDevices.get(PIPECONF_ID).isEmpty()); + } + + /** + * Clears the store and revers map. + */ + private void clear() { + store.pipeconfToDevices.clear(); + store.deviceToPipeconf.clear(); + } + +} \ No newline at end of file