From db3af8974afdfdb3c26e749b7c929c3856eceeaa Mon Sep 17 00:00:00 2001 From: Jonathan Hart Date: Mon, 26 Jan 2015 13:19:07 -0800 Subject: [PATCH] Initial implementation of EventuallyConsistentMap. The map uses the gossip schemes to replicate data between instances. It seems to work for basic add and remove use cases right now, no anti-entropy yet. ONOS-844. Change-Id: I7d05a7b532e40c95ab14e2c8911f18514bd0a8ca --- .../onosproject/store/impl/ClockService.java | 32 + .../store/impl/EventuallyConsistentMap.java | 171 +++++ .../impl/EventuallyConsistentMapEvent.java | 71 +++ .../impl/EventuallyConsistentMapImpl.java | 598 ++++++++++++++++++ .../impl/EventuallyConsistentMapListener.java | 30 + .../store/impl/WallclockClockManager.java | 28 + 6 files changed, 930 insertions(+) create mode 100644 core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java create mode 100644 core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java create mode 100644 core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java create mode 100644 core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java create mode 100644 core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java create mode 100644 core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java b/core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java new file mode 100644 index 0000000000..4fbfc2216d --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java @@ -0,0 +1,32 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.impl; + +import org.onosproject.store.Timestamp; + +/** + * Clock service that can generate timestamps per object. + */ +public interface ClockService { + + /** + * Gets a new timestamp for the given object. + * + * @param object Object to get a timestamp for + * @return the new timestamp + */ + public Timestamp getTimestamp(T object); +} diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java new file mode 100644 index 0000000000..bbddd1df1a --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java @@ -0,0 +1,171 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.impl; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +/** + * A distributed, eventually consistent map. + * + * This map does not offer read after writes consistency. Operations are + * serialized via the timestamps issued by the clock service. If two updates + * are in conflict, the update with the more recent timestamp will endure. + * + * The interface is mostly similar to {@link java.util.Map} with some minor + * semantic changes and the addition of a listener framework (because the map + * can be mutated by clients on other instances, not only through the local Java + * API). + * + * Clients are expected to register an + * {@link org.onosproject.store.impl.EventuallyConsistentMapListener} if they + * are interested in receiving notifications of update to the map. + */ +public interface EventuallyConsistentMap { + + /** + * Returns the number of key-value mappings in this map. + * + * @return number of key-value mappings + */ + public int size(); + + /** + * Returns true if this map is empty. + * + * @return true if this map is empty, otherwise false + */ + public boolean isEmpty(); + + /** + * Returns true if the map contains a mapping for the specified key. + * + * @param key the key to check if this map contains + * @return true if this map has a mapping for the key, otherwise false + */ + public boolean containsKey(K key); + + /** + * Returns true if the map contains a mapping from any key to the specified + * value. + * + * @param value the value to check if this map has a mapping for + * @return true if this map has a mapping to this value, otherwise false + */ + public boolean containsValue(V value); + + /** + * Returns the value mapped to the specified key. + * + * @param key the key to look up in this map + * @return the value mapped to the key, or null if no mapping is found + */ + public V get(K key); + + /** + * Associates the specified value to the specified key in this map. + *

+ * Note: this differs from the specification of {@link java.util.Map} + * because it does not return the previous value associated with the key. + * Clients are expected to register an + * {@link org.onosproject.store.impl.EventuallyConsistentMapListener} if + * they are interested in receiving notification of updates to the map. + *

+ * + * @param key the key to add a mapping for in this map + * @param value the value to associate with the key in this map + */ + public void put(K key, V value); + + /** + * Removes the mapping associated with the specified key from the map. + *

+ * Note: this differs from the specification of {@link java.util.Map} + * because it does not return the previous value associated with the key. + * Clients are expected to register an + * {@link org.onosproject.store.impl.EventuallyConsistentMapListener} if + * they are interested in receiving notification of updates to the map. + *

+ * + * @param key the key to remove the mapping for + */ + public void remove(K key); + + /** + * Adds mappings for all key-value pairs in the specified map to this map. + *

+ * This will be more efficient in communication than calling individual put + * operations. + *

+ * + * @param m a map of values to add to this map + */ + public void putAll(Map m); + + /** + * Removes all mappings from this map. + */ + public void clear(); + + /** + * Returns a set of the keys in this map. Changes to the set are not + * reflected back to the map. + * + * @return set of keys in the map + */ + public Set keySet(); + + /** + * Returns a collections of values in this map. Changes to the collection + * are not reflected back to the map. + * + * @return collection of values in the map + */ + public Collection values(); + + /** + * Returns a set of mappings contained in this map. Changes to the set are + * not reflected back to the map. + * + * @return set of key-value mappings in this map + */ + public Set> entrySet(); + + /** + * Adds the specified listener to the map which will be notified whenever + * the mappings in the map are changed. + * + * @param listener listener to register for events + */ + public void addListener(EventuallyConsistentMapListener listener); + + /** + * Removes the specified listener from the map such that it will no longer + * receive change notifications. + * + * @param listener listener to deregister for events + */ + public void removeListener(EventuallyConsistentMapListener listener); + + /** + * Shuts down the map and breaks communication between different instances. + * This allows the map objects to be cleaned up and garbage collected. + * Calls to any methods on the map subsequent to calling destroy() will + * throw a {@link java.lang.RuntimeException}. + */ + public void destroy(); +} diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java new file mode 100644 index 0000000000..fd38e646a3 --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java @@ -0,0 +1,71 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.impl; + +/** + * Event object signalling that the map was modified. + */ +public class EventuallyConsistentMapEvent { + + public enum Type { + PUT, + REMOVE + } + + private final Type type; + private final K key; + private final V value; + + /** + * Creates a new event object. + * + * @param type the type of the event + * @param key the key the event concerns + * @param value the value related to the key, or null for remove events + */ + public EventuallyConsistentMapEvent(Type type, K key, V value) { + this.type = type; + this.key = key; + this.value = value; + } + + /** + * Returns the type of the event. + * + * @return the type of the event + */ + public Type type() { + return type; + } + + /** + * Returns the key this event concerns. + * + * @return the key + */ + public K key() { + return key; + } + + /** + * Returns the value associated with this event. + * + * @return the value, or null if the event was REMOVE + */ + public V value() { + return value; + } +} diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java new file mode 100644 index 0000000000..b011517fe5 --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java @@ -0,0 +1,598 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.impl; + +import com.google.common.base.MoreObjects; +import org.onlab.util.KryoNamespace; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.NodeId; +import org.onosproject.store.Timestamp; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterMessage; +import org.onosproject.store.cluster.messaging.ClusterMessageHandler; +import org.onosproject.store.cluster.messaging.MessageSubject; +import org.onosproject.store.serializers.KryoSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.minPriority; +import static org.onlab.util.Tools.namedThreads; + +/** + * Distributed Map implementation which uses optimistic replication and gossip + * based techniques to provide an eventually consistent data store. + */ +public class EventuallyConsistentMapImpl + implements EventuallyConsistentMap { + + private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class); + + private final Map> items; + private final Map removedItems; + + private final String mapName; + private final ClusterService clusterService; + private final ClusterCommunicationService clusterCommunicator; + private final KryoSerializer serializer; + + private final ClockService clockService; + + private final MessageSubject updateMessageSubject; + private final MessageSubject removeMessageSubject; + + private final Set listeners + = new CopyOnWriteArraySet<>(); + + private final ExecutorService executor; + + private final ScheduledExecutorService backgroundExecutor; + + private volatile boolean destroyed = false; + private static final String ERROR_DESTROYED = " is already destroyed"; + + // TODO: Make these anti-entropy params configurable + private long initialDelaySec = 5; + private long periodSec = 5; + + /** + * Creates a new eventually consistent map shared amongst multiple instances. + * + * Each map is identified by a string map name. EventuallyConsistentMapImpl + * objects in different JVMs that use the same map name will form a + * distributed map across JVMs (provided the cluster service is aware of + * both nodes). + * + * The client is expected to provide an + * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that + * will be stored in this map have been registered (including referenced + * classes). This serializer will be used to serialize both K and V for + * inter-node notifications. + * + * The client must provide an {@link org.onosproject.store.impl.ClockService} + * which can generate timestamps for a given key. The clock service is free + * to generate timestamps however it wishes, however these timestamps will + * be used to serialize updates to the map so they must be strict enough + * to ensure updates are properly ordered for the use case (i.e. in some + * cases wallclock time will suffice, whereas in other cases logical time + * will be necessary). + * + * @param mapName a String identifier for the map. + * @param clusterService the cluster service + * @param clusterCommunicator the cluster communications service + * @param serializerBuilder a Kryo namespace builder that can serialize + * both K and V + * @param clockService a clock service able to generate timestamps + * for K + */ + public EventuallyConsistentMapImpl(String mapName, + ClusterService clusterService, + ClusterCommunicationService clusterCommunicator, + KryoNamespace.Builder serializerBuilder, + ClockService clockService) { + + this.mapName = checkNotNull(mapName); + this.clusterService = checkNotNull(clusterService); + this.clusterCommunicator = checkNotNull(clusterCommunicator); + + serializer = createSerializer(checkNotNull(serializerBuilder)); + + this.clockService = checkNotNull(clockService); + + items = new ConcurrentHashMap<>(); + removedItems = new ConcurrentHashMap<>(); + + executor = Executors + .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d")); + + backgroundExecutor = + newSingleThreadScheduledExecutor(minPriority( + namedThreads("onos-ecm-" + mapName + "-bg-%d"))); + + updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update"); + clusterCommunicator.addSubscriber(updateMessageSubject, + new InternalPutEventListener()); + removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove"); + clusterCommunicator.addSubscriber(removeMessageSubject, + new InternalRemoveEventListener()); + } + + private KryoSerializer createSerializer(KryoNamespace.Builder builder) { + return new KryoSerializer() { + @Override + protected void setupKryoPool() { + // Add the map's internal helper classes to the user-supplied serializer + serializerPool = builder + .register(WallClockTimestamp.class) + .register(PutEntry.class) + .register(ArrayList.class) + .register(InternalPutEvent.class) + .register(InternalRemoveEvent.class) + .build(); + + // TODO anti-entropy classes + } + }; + } + + @Override + public int size() { + checkState(destroyed, mapName + ERROR_DESTROYED); + return items.size(); + } + + @Override + public boolean isEmpty() { + checkState(destroyed, mapName + ERROR_DESTROYED); + return items.isEmpty(); + } + + @Override + public boolean containsKey(K key) { + checkState(destroyed, mapName + ERROR_DESTROYED); + return items.containsKey(key); + } + + @Override + public boolean containsValue(V value) { + checkState(destroyed, mapName + ERROR_DESTROYED); + + return items.values().stream() + .anyMatch(timestamped -> timestamped.value().equals(value)); + } + + @Override + public V get(K key) { + checkState(destroyed, mapName + ERROR_DESTROYED); + + Timestamped value = items.get(key); + if (value != null) { + return value.value(); + } + return null; + } + + @Override + public void put(K key, V value) { + checkState(destroyed, mapName + ERROR_DESTROYED); + + Timestamp timestamp = clockService.getTimestamp(key); + if (putInternal(key, value, timestamp)) { + notifyPeers(new InternalPutEvent<>(key, value, timestamp)); + EventuallyConsistentMapEvent externalEvent + = new EventuallyConsistentMapEvent<>( + EventuallyConsistentMapEvent.Type.PUT, key, value); + notifyListeners(externalEvent); + } + } + + private boolean putInternal(K key, V value, Timestamp timestamp) { + synchronized (this) { + Timestamp removed = removedItems.get(key); + if (removed != null && removed.compareTo(timestamp) > 0) { + return false; + } + + Timestamped existing = items.get(key); + if (existing != null && existing.isNewer(timestamp)) { + return false; + } else { + items.put(key, new Timestamped<>(value, timestamp)); + removedItems.remove(key); + return true; + } + } + } + + @Override + public void remove(K key) { + checkState(destroyed, mapName + ERROR_DESTROYED); + + Timestamp timestamp = clockService.getTimestamp(key); + if (removeInternal(key, timestamp)) { + notifyPeers(new InternalRemoveEvent<>(key, timestamp)); + EventuallyConsistentMapEvent externalEvent + = new EventuallyConsistentMapEvent<>( + EventuallyConsistentMapEvent.Type.REMOVE, key, null); + notifyListeners(externalEvent); + } + } + + private boolean removeInternal(K key, Timestamp timestamp) { + synchronized (this) { + if (items.get(key) != null && items.get(key).isNewer(timestamp)) { + return false; + } + + items.remove(key); + removedItems.put(key, timestamp); + return true; + } + } + + @Override + public void putAll(Map m) { + checkState(destroyed, mapName + ERROR_DESTROYED); + + List> updates = new ArrayList<>(m.size()); + + for (Map.Entry entry : m.entrySet()) { + K key = entry.getKey(); + V value = entry.getValue(); + Timestamp timestamp = clockService.getTimestamp(entry.getKey()); + + if (putInternal(key, value, timestamp)) { + updates.add(new PutEntry<>(key, value, timestamp)); + } + } + + notifyPeers(new InternalPutEvent<>(updates)); + + for (PutEntry entry : updates) { + EventuallyConsistentMapEvent externalEvent = + new EventuallyConsistentMapEvent<>( + EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value()); + notifyListeners(externalEvent); + } + } + + @Override + public void clear() { + checkState(destroyed, mapName + ERROR_DESTROYED); + + List> removed = new ArrayList<>(items.size()); + + for (K key : items.keySet()) { + Timestamp timestamp = clockService.getTimestamp(key); + + if (removeInternal(key, timestamp)) { + removed.add(new RemoveEntry<>(key, timestamp)); + } + } + + notifyPeers(new InternalRemoveEvent<>(removed)); + + for (RemoveEntry entry : removed) { + EventuallyConsistentMapEvent externalEvent = + new EventuallyConsistentMapEvent<>( + EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null); + notifyListeners(externalEvent); + } + } + + @Override + public Set keySet() { + checkState(destroyed, mapName + ERROR_DESTROYED); + + return items.keySet(); + } + + @Override + public Collection values() { + checkState(destroyed, mapName + ERROR_DESTROYED); + + return items.values().stream() + .map(Timestamped::value) + .collect(Collectors.toList()); + } + + @Override + public Set> entrySet() { + checkState(destroyed, mapName + ERROR_DESTROYED); + + return items.entrySet().stream() + .map(e -> new Entry(e.getKey(), e.getValue().value())) + .collect(Collectors.toSet()); + } + + @Override + public void addListener(EventuallyConsistentMapListener listener) { + checkState(destroyed, mapName + ERROR_DESTROYED); + + listeners.add(checkNotNull(listener)); + } + + @Override + public void removeListener(EventuallyConsistentMapListener listener) { + checkState(destroyed, mapName + ERROR_DESTROYED); + + listeners.remove(checkNotNull(listener)); + } + + @Override + public void destroy() { + destroyed = true; + + executor.shutdown(); + backgroundExecutor.shutdown(); + + clusterCommunicator.removeSubscriber(updateMessageSubject); + clusterCommunicator.removeSubscriber(removeMessageSubject); + } + + private void notifyListeners(EventuallyConsistentMapEvent event) { + for (EventuallyConsistentMapListener listener : listeners) { + listener.event(event); + } + } + + private void notifyPeers(InternalPutEvent event) { + try { + log.debug("sending put {}", event); + broadcastMessage(updateMessageSubject, event); + } catch (IOException e) { + // TODO this won't happen; remove from API + log.debug("IOException broadcasting update", e); + } + } + + private void notifyPeers(InternalRemoveEvent event) { + try { + broadcastMessage(removeMessageSubject, event); + } catch (IOException e) { + // TODO this won't happen; remove from API + log.debug("IOException broadcasting update", e); + } + } + + private void broadcastMessage(MessageSubject subject, Object event) throws + IOException { + ClusterMessage message = new ClusterMessage( + clusterService.getLocalNode().id(), + subject, + serializer.encode(event)); + clusterCommunicator.broadcast(message); + } + + private void unicastMessage(NodeId peer, + MessageSubject subject, + Object event) throws IOException { + ClusterMessage message = new ClusterMessage( + clusterService.getLocalNode().id(), + subject, + serializer.encode(event)); + clusterCommunicator.unicast(message, peer); + } + + private final class Entry implements Map.Entry { + + private final K key; + private final V value; + + public Entry(K key, V value) { + this.key = key; + this.value = value; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } + } + + private final class InternalPutEventListener implements + ClusterMessageHandler { + @Override + public void handle(ClusterMessage message) { + log.debug("Received put event from peer: {}", message.sender()); + InternalPutEvent event = serializer.decode(message.payload()); + + executor.submit(() -> { + try { + for (PutEntry entry : event.entries()) { + K key = entry.key(); + V value = entry.value(); + Timestamp timestamp = entry.timestamp(); + + if (putInternal(key, value, timestamp)) { + EventuallyConsistentMapEvent externalEvent = + new EventuallyConsistentMapEvent<>( + EventuallyConsistentMapEvent.Type.PUT, key, + value); + notifyListeners(externalEvent); + } + } + } catch (Exception e) { + log.warn("Exception thrown handling put", e); + } + }); + } + } + + private final class InternalRemoveEventListener implements + ClusterMessageHandler { + @Override + public void handle(ClusterMessage message) { + log.debug("Received remove event from peer: {}", message.sender()); + InternalRemoveEvent event = serializer.decode(message.payload()); + + executor.submit(() -> { + try { + for (RemoveEntry entry : event.entries()) { + K key = entry.key(); + Timestamp timestamp = entry.timestamp(); + + if (removeInternal(key, timestamp)) { + EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent( + EventuallyConsistentMapEvent.Type.REMOVE, + key, null); + notifyListeners(externalEvent); + } + } + } catch (Exception e) { + log.warn("Exception thrown handling remove", e); + } + }); + } + } + + private static final class InternalPutEvent { + private final List> entries; + + public InternalPutEvent(K key, V value, Timestamp timestamp) { + entries = Collections + .singletonList(new PutEntry<>(key, value, timestamp)); + } + + public InternalPutEvent(List> entries) { + this.entries = checkNotNull(entries); + } + + // Needed for serialization. + @SuppressWarnings("unused") + private InternalPutEvent() { + entries = null; + } + + public List> entries() { + return entries; + } + } + + private static final class PutEntry { + private final K key; + private final V value; + private final Timestamp timestamp; + + public PutEntry(K key, V value, Timestamp timestamp) { + this.key = checkNotNull(key); + this.value = checkNotNull(value); + this.timestamp = checkNotNull(timestamp); + } + + // Needed for serialization. + @SuppressWarnings("unused") + private PutEntry() { + this.key = null; + this.value = null; + this.timestamp = null; + } + + public K key() { + return key; + } + + public V value() { + return value; + } + + public Timestamp timestamp() { + return timestamp; + } + + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("key", key) + .add("value", value) + .add("timestamp", timestamp) + .toString(); + } + } + + private static final class InternalRemoveEvent { + private final List> entries; + + public InternalRemoveEvent(K key, Timestamp timestamp) { + entries = Collections.singletonList( + new RemoveEntry<>(key, timestamp)); + } + + public InternalRemoveEvent(List> entries) { + this.entries = checkNotNull(entries); + } + + // Needed for serialization. + @SuppressWarnings("unused") + private InternalRemoveEvent() { + entries = null; + } + + public List> entries() { + return entries; + } + } + + private static final class RemoveEntry { + private final K key; + private final Timestamp timestamp; + + public RemoveEntry(K key, Timestamp timestamp) { + this.key = checkNotNull(key); + this.timestamp = checkNotNull(timestamp); + } + + // Needed for serialization. + @SuppressWarnings("unused") + private RemoveEntry() { + this.key = null; + this.timestamp = null; + } + + public K key() { + return key; + } + + public Timestamp timestamp() { + return timestamp; + } + } +} diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java new file mode 100644 index 0000000000..289f46c7f4 --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java @@ -0,0 +1,30 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.impl; + +/** + * Listener interested in receiving modification events for an + * EventuallyConsistentMap. + */ +public interface EventuallyConsistentMapListener { + + /** + * Reacts to the specified event. + * + * @param event the event + */ + public void event(EventuallyConsistentMapEvent event); +} diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java new file mode 100644 index 0000000000..0103ee4e36 --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java @@ -0,0 +1,28 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.impl; + +import org.onosproject.store.Timestamp; + +/** + * A clock service which hands out wallclock-based timestamps. + */ +public class WallclockClockManager implements ClockService { + @Override + public Timestamp getTimestamp(T object) { + return new WallClockTimestamp(); + } +}