diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java index 34e3d31781..f8a25cb1c1 100644 --- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java +++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java @@ -13,7 +13,7 @@ public class FlowRuleBatchRequest { private final List toAdd; private final List toRemove; - public FlowRuleBatchRequest(int batchId, List toAdd, List toRemove) { + public FlowRuleBatchRequest(int batchId, List toAdd, List toRemove) { this.batchId = batchId; this.toAdd = Collections.unmodifiableList(toAdd); this.toRemove = Collections.unmodifiableList(toRemove); diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java index 30b9008cd5..69b6743333 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java @@ -1,5 +1,6 @@ package org.onlab.onos.store.flow.impl; +import static com.google.common.base.Preconditions.checkNotNull; import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; import static org.slf4j.LoggerFactory.getLogger; import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*; @@ -10,6 +11,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -27,6 +29,7 @@ import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.onlab.onos.cluster.ClusterService; +import org.onlab.onos.cluster.NodeId; import org.onlab.onos.net.Device; import org.onlab.onos.net.DeviceId; import org.onlab.onos.net.device.DeviceService; @@ -34,6 +37,7 @@ import org.onlab.onos.net.flow.CompletedBatchOperation; import org.onlab.onos.net.flow.DefaultFlowEntry; import org.onlab.onos.net.flow.FlowEntry; import org.onlab.onos.net.flow.FlowEntry.FlowEntryState; +import org.onlab.onos.net.flow.FlowId; import org.onlab.onos.net.flow.FlowRule; import org.onlab.onos.net.flow.FlowRuleBatchEntry; import org.onlab.onos.net.flow.FlowRuleBatchEvent; @@ -45,12 +49,15 @@ import org.onlab.onos.net.flow.FlowRuleEvent.Type; import org.onlab.onos.net.flow.FlowRuleStore; import org.onlab.onos.net.flow.FlowRuleStoreDelegate; import org.onlab.onos.net.flow.StoredFlowEntry; -import org.onlab.onos.store.AbstractStore; import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; import org.onlab.onos.store.flow.ReplicaInfo; +import org.onlab.onos.store.flow.ReplicaInfoEvent; +import org.onlab.onos.store.flow.ReplicaInfoEventListener; import org.onlab.onos.store.flow.ReplicaInfoService; +import org.onlab.onos.store.hz.AbstractHazelcastStore; +import org.onlab.onos.store.hz.SMap; import org.onlab.onos.store.serializers.DistributedStoreSerializers; import org.onlab.onos.store.serializers.KryoSerializer; import org.onlab.util.KryoNamespace; @@ -59,13 +66,17 @@ import org.slf4j.Logger; import com.google.common.base.Function; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import com.hazelcast.core.IMap; /** * Manages inventory of flow rules using a distributed state management protocol. @@ -73,7 +84,7 @@ import com.google.common.util.concurrent.SettableFuture; @Component(immediate = true) @Service public class DistributedFlowRuleStore - extends AbstractStore + extends AbstractHazelcastStore implements FlowRuleStore { private final Logger log = getLogger(getClass()); @@ -82,8 +93,6 @@ public class DistributedFlowRuleStore private final Multimap flowEntries = ArrayListMultimap.create(); - private final Multimap flowEntriesById = - ArrayListMultimap.create(); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ReplicaInfoService replicaInfoManager; @@ -109,10 +118,17 @@ public class DistributedFlowRuleStore //.removalListener(listener) .build(); + private LoadingCache>> smaps; + private final ExecutorService futureListeners = Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders")); + private final ExecutorService backupExecutors = + Executors.newSingleThreadExecutor(namedThreads("async-backups")); + + // TODO make this configurable + private boolean syncBackup = false; protected static final KryoSerializer SERIALIZER = new KryoSerializer() { @Override @@ -127,8 +143,20 @@ public class DistributedFlowRuleStore // TODO: make this configurable private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000; + private ReplicaInfoEventListener replicaInfoEventListener; + + @Override @Activate public void activate() { + + super.serializer = SERIALIZER; + super.theInstance = storeService.getHazelcastInstance(); + + // Cache to create SMap on demand + smaps = CacheBuilder.newBuilder() + .softValues() + .build(new SMapLoader()); + clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() { @Override @@ -182,11 +210,16 @@ public class DistributedFlowRuleStore } }); + replicaInfoEventListener = new InternalReplicaInfoEventListener(); + + replicaInfoManager.addListener(replicaInfoEventListener); + log.info("Started"); } @Deactivate public void deactivate() { + replicaInfoManager.removeListener(replicaInfoEventListener); log.info("Stopped"); } @@ -276,8 +309,10 @@ public class DistributedFlowRuleStore storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)))); } + // FIXME document that all of the FlowEntries must be about same device @Override public Future storeBatch(FlowRuleBatchOperation operation) { + if (operation.getOperations().isEmpty()) { return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet())); } @@ -313,12 +348,17 @@ public class DistributedFlowRuleStore } private ListenableFuture storeBatchInternal(FlowRuleBatchOperation operation) { - List toRemove = new ArrayList<>(); - List toAdd = new ArrayList<>(); - // TODO: backup changes to hazelcast map + final List toRemove = new ArrayList<>(); + final List toAdd = new ArrayList<>(); + DeviceId did = null; + + for (FlowRuleBatchEntry batchEntry : operation.getOperations()) { FlowRule flowRule = batchEntry.getTarget(); FlowRuleOperation op = batchEntry.getOperator(); + if (did == null) { + did = flowRule.deviceId(); + } if (op.equals(FlowRuleOperation.REMOVE)) { StoredFlowEntry entry = getFlowEntryInternal(flowRule); if (entry != null) { @@ -330,7 +370,6 @@ public class DistributedFlowRuleStore DeviceId deviceId = flowRule.deviceId(); if (!flowEntries.containsEntry(deviceId, flowEntry)) { flowEntries.put(deviceId, flowEntry); - flowEntriesById.put(flowRule.appId(), flowEntry); toAdd.add(flowEntry); } } @@ -339,14 +378,39 @@ public class DistributedFlowRuleStore return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet())); } + // create remote backup copies + final DeviceId deviceId = did; + updateBackup(deviceId, toAdd, toRemove); + SettableFuture r = SettableFuture.create(); final int batchId = localBatchIdGen.incrementAndGet(); pendingFutures.put(batchId, r); notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove))); + return r; } + private void updateBackup(final DeviceId deviceId, + final List toAdd, + final List list) { + + Future submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list)); + + if (syncBackup) { + // wait for backup to complete + try { + submit.get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to create backups", e); + } + } + } + + private void updateBackup(DeviceId deviceId, List toAdd) { + updateBackup(deviceId, toAdd, Collections.emptyList()); + } + @Override public void deleteFlowRule(FlowRule rule) { storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule)))); @@ -365,7 +429,7 @@ public class DistributedFlowRuleStore } private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) { - DeviceId did = rule.deviceId(); + final DeviceId did = rule.deviceId(); // check if this new rule is an update to an existing entry StoredFlowEntry stored = getFlowEntryInternal(rule); @@ -375,16 +439,18 @@ public class DistributedFlowRuleStore stored.setPackets(rule.packets()); if (stored.state() == FlowEntryState.PENDING_ADD) { stored.setState(FlowEntryState.ADDED); + // update backup. + updateBackup(did, Arrays.asList(stored)); return new FlowRuleEvent(Type.RULE_ADDED, rule); } return new FlowRuleEvent(Type.RULE_UPDATED, rule); } // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore + // TODO: also update backup. flowEntries.put(did, new DefaultFlowEntry(rule)); return null; - // TODO: also update backup. } @Override @@ -401,13 +467,15 @@ public class DistributedFlowRuleStore } private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) { + final DeviceId deviceId = rule.deviceId(); // This is where one could mark a rule as removed and still keep it in the store. - if (flowEntries.remove(rule.deviceId(), rule)) { + final boolean removed = flowEntries.remove(deviceId, rule); + updateBackup(deviceId, Collections.emptyList(), Arrays.asList(rule)); + if (removed) { return new FlowRuleEvent(RULE_REMOVED, rule); } else { return null; } - // TODO: also update backup. } @Override @@ -421,4 +489,145 @@ public class DistributedFlowRuleStore } notifyDelegate(event); } + + private synchronized void loadFromBackup(final DeviceId did) { + // should relax synchronized condition + + try { + log.info("Loading FlowRules for {} from backups", did); + SMap> backupFlowTable = smaps.get(did); + for (Entry> e + : backupFlowTable.entrySet()) { + + // TODO: should we be directly updating internal structure or + // should we be triggering event? + log.debug("loading {}", e.getValue()); + for (StoredFlowEntry entry : e.getValue()) { + flowEntries.remove(did, entry); + flowEntries.put(did, entry); + } + } + } catch (ExecutionException e) { + log.error("Failed to load backup flowtable for {}", did, e); + } + } + + private synchronized void removeFromPrimary(final DeviceId did) { + Collection removed = flowEntries.removeAll(did); + log.debug("removedFromPrimary {}", removed); + } + + private final class SMapLoader + extends CacheLoader>> { + + @Override + public SMap> load(DeviceId id) + throws Exception { + IMap map = theInstance.getMap("flowtable_" + id.toString()); + return new SMap>(map, SERIALIZER); + } + } + + private final class InternalReplicaInfoEventListener + implements ReplicaInfoEventListener { + + @Override + public void event(ReplicaInfoEvent event) { + final NodeId local = clusterService.getLocalNode().id(); + final DeviceId did = event.subject(); + final ReplicaInfo rInfo = event.replicaInfo(); + + switch (event.type()) { + case MASTER_CHANGED: + if (local.equals(rInfo.master().orNull())) { + // This node is the new master, populate local structure + // from backup + loadFromBackup(did); + } else { + // This node is no longer the master holder, + // clean local structure + removeFromPrimary(did); + // FIXME: probably should stop pending backup activities in + // executors to avoid overwriting with old value + } + break; + default: + break; + + } + } + } + + // Task to update FlowEntries in backup HZ store + private final class UpdateBackup implements Runnable { + + private final DeviceId deviceId; + private final List toAdd; + private final List toRemove; + + public UpdateBackup(DeviceId deviceId, + List toAdd, + List list) { + this.deviceId = checkNotNull(deviceId); + this.toAdd = checkNotNull(toAdd); + this.toRemove = checkNotNull(list); + } + + @Override + public void run() { + try { + log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove); + final SMap> backupFlowTable = smaps.get(deviceId); + // Following should be rewritten using async APIs + for (StoredFlowEntry entry : toAdd) { + final FlowId id = entry.id(); + ImmutableList original = backupFlowTable.get(id); + List list = new ArrayList<>(); + if (original != null) { + list.addAll(original); + } + + list.remove(entry); + list.add(entry); + + ImmutableList newValue = ImmutableList.copyOf(list); + boolean success; + if (original == null) { + success = (backupFlowTable.putIfAbsent(id, newValue) == null); + } else { + success = backupFlowTable.replace(id, original, newValue); + } + // TODO retry? + if (!success) { + log.error("Updating backup failed."); + } + } + for (FlowRule entry : toRemove) { + final FlowId id = entry.id(); + ImmutableList original = backupFlowTable.get(id); + List list = new ArrayList<>(); + if (original != null) { + list.addAll(original); + } + + list.remove(entry); + + ImmutableList newValue = ImmutableList.copyOf(list); + boolean success; + if (original == null) { + success = (backupFlowTable.putIfAbsent(id, newValue) == null); + } else { + success = backupFlowTable.replace(id, original, newValue); + } + // TODO retry? + if (!success) { + log.error("Updating backup failed."); + } + } + } catch (ExecutionException e) { + log.error("Failed to write to backups", e); + } + + } + } }