Move flow count updates off write path in ECFlowRuleStore

Change-Id: I44c611625baec124a45524ddb39fbe74f4c3c907
This commit is contained in:
Jordan Halterman 2018-06-12 11:23:33 -07:00 committed by Ray Milkey
parent a765d22222
commit 8f90d6d707

View File

@ -31,7 +31,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@ -56,11 +60,6 @@ import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowEntry.FlowEntryState;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleEvent.Type;
import org.onosproject.net.flow.FlowRuleService;
@ -68,6 +67,11 @@ import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@ -87,11 +91,6 @@ import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
@ -323,7 +322,7 @@ public class ECFlowRuleStore
clusterCommunicator.addSubscriber(
REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
clusterCommunicator.addSubscriber(
FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackup, serializer::encode, executor);
}
private void unregisterMessageHandlers() {
@ -851,12 +850,6 @@ public class ECFlowRuleStore
private void recordUpdate(BucketId bucketId) {
lastUpdateTimes.put(bucketId, System.currentTimeMillis());
FlowBucket flowBucket = getFlowBucket(bucketId);
int flowCount = flowBucket.table().entrySet()
.stream()
.mapToInt(e -> e.getValue().values().size())
.sum();
flowCounts.put(bucketId, flowCount);
}
public void add(FlowEntry rule) {
@ -925,37 +918,63 @@ public class ECFlowRuleStore
flowEntries.clear();
}
/**
* Returns a boolean indicating whether the local node is the current master for the given device.
*
* @param deviceId the device for which to indicate whether the local node is the current master
* @return indicates whether the local node is the current master for the given device
*/
private boolean isMasterNode(DeviceId deviceId) {
NodeId master = replicaInfoManager.getReplicaInfoFor(deviceId).master().orElse(null);
return Objects.equals(master, clusterService.getLocalNode().id());
}
private boolean isBackupNode(NodeId nodeId, DeviceId deviceId) {
List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
return allPossibleBackupNodes.indexOf(nodeId) < backupCount;
}
/**
* Backs up all devices to all backup nodes.
*/
private void backup() {
clusterService.getNodes().stream()
.filter(node -> !node.id().equals(clusterService.getLocalNode().id()))
.forEach(node -> {
try {
backup(node.id());
} catch (Exception e) {
log.error("Backup failed.", e);
}
});
for (DeviceId deviceId : flowEntries.keySet()) {
backup(deviceId);
}
}
private void backup(NodeId nodeId) {
for (DeviceId deviceId : flowEntries.keySet()) {
if (isMasterNode(deviceId) && isBackupNode(nodeId, deviceId)) {
backup(nodeId, deviceId);
/**
* Backs up all buckets in the given device to the given node.
*
* @param deviceId the device to back up
*/
private void backup(DeviceId deviceId) {
if (!isMasterNode(deviceId)) {
return;
}
// Get a list of backup nodes for the device.
List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
int availableBackupCount = Math.min(backupCount, backupNodes.size());
// If the list of backup nodes is empty, update the flow count.
if (availableBackupCount == 0) {
updateDeviceFlowCounts(deviceId);
} else {
// Otherwise, iterate through backup nodes and backup the device.
for (int index = 0; index < availableBackupCount; index++) {
NodeId backupNode = backupNodes.get(index);
try {
backup(deviceId, backupNode);
} catch (Exception e) {
log.error("Backup of " + deviceId + " to " + backupNode + " failed", e);
}
}
}
}
private void backup(NodeId nodeId, DeviceId deviceId) {
/**
* Backs up all buckets for the given device to the given node.
*
* @param deviceId the device to back up
* @param nodeId the node to which to back up the device
*/
private void backup(DeviceId deviceId, NodeId nodeId) {
final long timestamp = System.currentTimeMillis();
for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
BucketId bucketId = new BucketId(deviceId, bucket);
@ -967,27 +986,56 @@ public class ECFlowRuleStore
} else {
failBackup(operation);
}
backup(nodeId);
backup(deviceId, nodeId);
}, backupSenderExecutor);
}
}
}
/**
* Returns a boolean indicating whether the given {@link BackupOperation} can be started.
* <p>
* The backup can be started if no backup for the same device/bucket/node is already in progress and changes
* are pending replication for the backup operation.
*
* @param operation the operation to start
* @return indicates whether the given backup operation should be started
*/
private boolean startBackup(BackupOperation operation) {
long lastBackupTime = lastBackupTimes.getOrDefault(operation, 0L);
long lastUpdateTime = lastUpdateTimes.getOrDefault(operation.bucketId(), 0L);
return lastUpdateTime > 0 && lastBackupTime <= lastUpdateTime && inFlightUpdates.add(operation);
}
/**
* Fails the given backup operation.
*
* @param operation the backup operation to fail
*/
private void failBackup(BackupOperation operation) {
inFlightUpdates.remove(operation);
}
/**
* Succeeds the given backup operation.
* <p>
* The last backup time for the operation will be updated and the operation will be removed from
* in-flight updates.
*
* @param operation the operation to succeed
* @param timestamp the timestamp at which the operation was <em>started</em>
*/
private void succeedBackup(BackupOperation operation, long timestamp) {
inFlightUpdates.remove(operation);
lastBackupTimes.put(operation, timestamp);
inFlightUpdates.remove(operation);
}
/**
* Performs the given backup operation.
*
* @param operation the operation to perform
* @return a future to be completed with a boolean indicating whether the backup operation was successful
*/
private CompletableFuture<Boolean> backup(BackupOperation operation) {
log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.",
operation.bucketId().bucket(), operation.bucketId().deviceId(), operation.nodeId());
@ -1010,10 +1058,18 @@ public class ECFlowRuleStore
}
future.complete(backedupFlows != null);
});
updateFlowCounts(flowBucket);
return future;
}
private Set<FlowId> onBackupReceipt(FlowBucket flowBucket) {
/**
* Handles a flow bucket backup from a remote peer.
*
* @param flowBucket the flow bucket to back up
* @return the set of flows that could not be backed up
*/
private Set<FlowId> onBackup(FlowBucket flowBucket) {
log.debug("Received flowEntries for {} bucket {} to backup",
flowBucket.bucketId().deviceId(), flowBucket.bucketId);
Set<FlowId> backedupFlows = Sets.newHashSet();
@ -1036,6 +1092,32 @@ public class ECFlowRuleStore
}
return backedupFlows;
}
/**
* Updates all flow counts for the given device.
*
* @param deviceId the device for which to update flow counts
*/
private void updateDeviceFlowCounts(DeviceId deviceId) {
for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
BucketId bucketId = new BucketId(deviceId, bucket);
FlowBucket flowBucket = getFlowBucket(bucketId);
updateFlowCounts(flowBucket);
}
}
/**
* Updates the eventually consistent flow count for the given bucket.
*
* @param flowBucket the flow bucket for which to update flow counts
*/
private void updateFlowCounts(FlowBucket flowBucket) {
int flowCount = flowBucket.table().entrySet()
.stream()
.mapToInt(e -> e.getValue().values().size())
.sum();
flowCounts.put(flowBucket.bucketId(), flowCount);
}
}
@Override