From 902d41b8cd82b90b88a98a69b77873d9a38a6f36 Mon Sep 17 00:00:00 2001 From: alshabib Date: Tue, 7 Oct 2014 16:52:05 -0700 Subject: [PATCH] initial working impl of batch operations Change-Id: Ie970543dec1104a394c7bcfa6eec24c0538278d6 --- .../net/flow/CompletedBatchOperation.java | 6 + .../org/onlab/onos/net/flow/FlowRule.java | 3 +- .../onos/net/flow/FlowRuleBatchEntry.java | 20 +++ .../onos/net/flow/FlowRuleBatchOperation.java | 13 ++ .../onlab/onos/net/flow/FlowRuleProvider.java | 5 + .../onlab/onos/net/flow/FlowRuleService.java | 9 +- .../onlab/onos/net/intent/BatchOperation.java | 13 +- .../onos/net/intent/BatchOperationEntry.java | 9 +- .../onos/net/flow/impl/FlowRuleManager.java | 100 +++++++++++ .../net/intent/impl/PathIntentInstaller.java | 33 +++- .../net/flow/impl/FlowRuleManagerTest.java | 10 ++ .../provider/of/flow/impl/FlowModBuilder.java | 20 ++- .../of/flow/impl/OpenFlowRuleProvider.java | 169 +++++++++++++++++- 13 files changed, 389 insertions(+), 21 deletions(-) create mode 100644 core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java create mode 100644 core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java create mode 100644 core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java new file mode 100644 index 0000000000..bde752e0b1 --- /dev/null +++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java @@ -0,0 +1,6 @@ +package org.onlab.onos.net.flow; + +public class CompletedBatchOperation { + + +} diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java index 8b30c74756..410aed4783 100644 --- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java +++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java @@ -2,12 +2,13 @@ package org.onlab.onos.net.flow; import org.onlab.onos.ApplicationId; import org.onlab.onos.net.DeviceId; +import org.onlab.onos.net.intent.BatchOperationTarget; /** * Represents a generalized match & action pair to be applied to * an infrastucture device. */ -public interface FlowRule { +public interface FlowRule extends BatchOperationTarget { static final int MAX_TIMEOUT = 60; static final int MIN_PRIORITY = 0; diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java new file mode 100644 index 0000000000..d5a1472d33 --- /dev/null +++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java @@ -0,0 +1,20 @@ +package org.onlab.onos.net.flow; + +import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation; +import org.onlab.onos.net.intent.BatchOperationEntry; + + +public class FlowRuleBatchEntry + extends BatchOperationEntry { + + public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target) { + super(operator, target); + } + + public enum FlowRuleOperation { + ADD, + REMOVE, + MODIFY + } + +} diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java new file mode 100644 index 0000000000..74ef165e1b --- /dev/null +++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java @@ -0,0 +1,13 @@ +package org.onlab.onos.net.flow; + +import java.util.Collection; + +import org.onlab.onos.net.intent.BatchOperation; + +public class FlowRuleBatchOperation + extends BatchOperation { + + public FlowRuleBatchOperation(Collection operations) { + super(operations); + } +} diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java index c4e2f926d3..68762ac6a2 100644 --- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java +++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java @@ -1,6 +1,9 @@ package org.onlab.onos.net.flow; +import java.util.concurrent.Future; + import org.onlab.onos.ApplicationId; +import org.onlab.onos.net.intent.BatchOperation; import org.onlab.onos.net.provider.Provider; /** @@ -34,4 +37,6 @@ public interface FlowRuleProvider extends Provider { */ void removeRulesById(ApplicationId id, FlowRule... flowRules); + Future executeBatch(BatchOperation batch); + } diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java index 8600c5440c..6d04810fde 100644 --- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java +++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java @@ -1,5 +1,7 @@ package org.onlab.onos.net.flow; +import java.util.concurrent.Future; + import org.onlab.onos.ApplicationId; import org.onlab.onos.net.DeviceId; @@ -66,7 +68,12 @@ public interface FlowRuleService { */ Iterable getFlowRulesById(ApplicationId id); - //Future applyBatch(BatchOperation) + /** + * Applies a batch operation of FlowRules. + * + * @return future indicating the state of the batch operation + */ + Future applyBatch(FlowRuleBatchOperation batch); /** * Adds the specified flow rule listener. diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java index 5d0cbb8950..72a9847851 100644 --- a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java +++ b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java @@ -1,12 +1,13 @@ package org.onlab.onos.net.intent; //TODO is this the right package? +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import static com.google.common.base.Preconditions.checkNotNull; - /** * A list of BatchOperationEntry. * @@ -15,7 +16,7 @@ import static com.google.common.base.Preconditions.checkNotNull; */ public abstract class BatchOperation> { - private List ops; + private final List ops; /** * Creates new {@link BatchOperation} object. @@ -30,7 +31,7 @@ public abstract class BatchOperation> { * * @param batchOperations the list of batch operation entries. */ - public BatchOperation(List batchOperations) { + public BatchOperation(Collection batchOperations) { ops = new LinkedList<>(checkNotNull(batchOperations)); } @@ -61,6 +62,10 @@ public abstract class BatchOperation> { /** * Adds an operation. + * FIXME: Brian promises that the Intent Framework + * will not modify the batch operation after it has submitted it. + * Ali would prefer immutablity, but trusts brian for better or + * for worse. * * @param entry the operation to be added * @return this object if succeeded, null otherwise diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java index b5dfa88f3b..4e57d335a7 100644 --- a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java +++ b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java @@ -15,14 +15,7 @@ public class BatchOperationEntry, U extends BatchOperationTarg private final T operator; private final U target; - /** - * Default constructor for serializer. - */ - @Deprecated - protected BatchOperationEntry() { - this.operator = null; - this.target = null; - } + /** * Constructs new instance for the entry of the BatchOperation. diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java index ce11cea33a..a9eddd8ce1 100644 --- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java +++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java @@ -5,6 +5,10 @@ import static org.slf4j.LoggerFactory.getLogger; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; @@ -18,8 +22,11 @@ import org.onlab.onos.event.EventDeliveryService; import org.onlab.onos.net.Device; import org.onlab.onos.net.DeviceId; import org.onlab.onos.net.device.DeviceService; +import org.onlab.onos.net.flow.CompletedBatchOperation; import org.onlab.onos.net.flow.FlowEntry; import org.onlab.onos.net.flow.FlowRule; +import org.onlab.onos.net.flow.FlowRuleBatchEntry; +import org.onlab.onos.net.flow.FlowRuleBatchOperation; import org.onlab.onos.net.flow.FlowRuleEvent; import org.onlab.onos.net.flow.FlowRuleListener; import org.onlab.onos.net.flow.FlowRuleProvider; @@ -32,7 +39,9 @@ import org.onlab.onos.net.provider.AbstractProviderRegistry; import org.onlab.onos.net.provider.AbstractProviderService; import org.slf4j.Logger; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; /** * Provides implementation of the flow NB & SB APIs. @@ -130,6 +139,38 @@ public class FlowRuleManager return store.getFlowRulesByAppId(id); } + @Override + public Future applyBatch( + FlowRuleBatchOperation batch) { + Multimap batches = + ArrayListMultimap.create(); + List> futures = Lists.newArrayList(); + for (FlowRuleBatchEntry fbe : batch.getOperations()) { + final FlowRule f = fbe.getTarget(); + final Device device = deviceService.getDevice(f.deviceId()); + final FlowRuleProvider frp = getProvider(device.providerId()); + batches.put(frp, fbe); + switch (fbe.getOperator()) { + case ADD: + store.storeFlowRule(f); + break; + case REMOVE: + store.deleteFlowRule(f); + break; + case MODIFY: + default: + log.error("Batch operation type {} unsupported.", fbe.getOperator()); + } + } + for (FlowRuleProvider provider : batches.keySet()) { + FlowRuleBatchOperation b = + new FlowRuleBatchOperation(batches.get(provider)); + Future future = provider.executeBatch(b); + futures.add(future); + } + return new FlowRuleBatchFuture(futures); + } + @Override public void addListener(FlowRuleListener listener) { listenerRegistry.addListener(listener); @@ -296,4 +337,63 @@ public class FlowRuleManager eventDispatcher.post(event); } } + + private class FlowRuleBatchFuture + implements Future { + + private final List> futures; + + public FlowRuleBatchFuture(List> futures) { + this.futures = futures; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isCancelled() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isDone() { + boolean isDone = true; + for (Future future : futures) { + isDone &= future.isDone(); + } + return isDone; + } + + @Override + public CompletedBatchOperation get() throws InterruptedException, + ExecutionException { + // TODO Auto-generated method stub + for (Future future : futures) { + future.get(); + } + return new CompletedBatchOperation(); + } + + @Override + public CompletedBatchOperation get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, + TimeoutException { + // TODO we should decrement the timeout + long start = System.nanoTime(); + long end = start + unit.toNanos(timeout); + for (Future future : futures) { + long now = System.nanoTime(); + long thisTimeout = end - now; + future.get(thisTimeout, TimeUnit.NANOSECONDS); + } + return new CompletedBatchOperation(); + } + + } + + } diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java index a0995e4dc9..0ca75c2f45 100644 --- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java +++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java @@ -4,6 +4,8 @@ import static org.onlab.onos.net.flow.DefaultTrafficTreatment.builder; import static org.slf4j.LoggerFactory.getLogger; import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; @@ -16,6 +18,9 @@ import org.onlab.onos.net.Link; import org.onlab.onos.net.flow.DefaultFlowRule; import org.onlab.onos.net.flow.DefaultTrafficSelector; import org.onlab.onos.net.flow.FlowRule; +import org.onlab.onos.net.flow.FlowRuleBatchEntry; +import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation; +import org.onlab.onos.net.flow.FlowRuleBatchOperation; import org.onlab.onos.net.flow.FlowRuleService; import org.onlab.onos.net.flow.TrafficSelector; import org.onlab.onos.net.flow.TrafficTreatment; @@ -24,6 +29,8 @@ import org.onlab.onos.net.intent.IntentInstaller; import org.onlab.onos.net.intent.PathIntent; import org.slf4j.Logger; +import com.google.common.collect.Lists; + /** * Installer for {@link PathIntent path connectivity intents}. */ @@ -56,19 +63,27 @@ public class PathIntentInstaller implements IntentInstaller { DefaultTrafficSelector.builder(intent.selector()); Iterator links = intent.path().links().iterator(); ConnectPoint prev = links.next().dst(); - + List rules = Lists.newLinkedList(); while (links.hasNext()) { builder.matchInport(prev.port()); Link link = links.next(); TrafficTreatment treatment = builder() .setOutput(link.src().port()).build(); + FlowRule rule = new DefaultFlowRule(link.src().deviceId(), builder.build(), treatment, 123, appId, 600); - flowRuleService.applyFlowRules(rule); + rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)); + //flowRuleService.applyFlowRules(rule); prev = link.dst(); } - + FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules); + try { + flowRuleService.applyBatch(batch).get(); + } catch (InterruptedException | ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } @Override @@ -77,6 +92,7 @@ public class PathIntentInstaller implements IntentInstaller { DefaultTrafficSelector.builder(intent.selector()); Iterator links = intent.path().links().iterator(); ConnectPoint prev = links.next().dst(); + List rules = Lists.newLinkedList(); while (links.hasNext()) { builder.matchInport(prev.port()); @@ -86,9 +102,16 @@ public class PathIntentInstaller implements IntentInstaller { FlowRule rule = new DefaultFlowRule(link.src().deviceId(), builder.build(), treatment, 123, appId, 600); - - flowRuleService.removeFlowRules(rule); + rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule)); + //flowRuleService.removeFlowRules(rule); prev = link.dst(); } + FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules); + try { + flowRuleService.applyBatch(batch).get(); + } catch (InterruptedException | ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } } diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java index 74636718f5..86f3ddc6cb 100644 --- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java +++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java @@ -12,6 +12,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.Future; import org.junit.After; import org.junit.Before; @@ -32,6 +33,7 @@ import org.onlab.onos.net.flow.DefaultFlowRule; import org.onlab.onos.net.flow.FlowEntry; import org.onlab.onos.net.flow.FlowEntry.FlowEntryState; import org.onlab.onos.net.flow.FlowRule; +import org.onlab.onos.net.flow.FlowRuleBatchEntry; import org.onlab.onos.net.flow.FlowRuleEvent; import org.onlab.onos.net.flow.FlowRuleListener; import org.onlab.onos.net.flow.FlowRuleProvider; @@ -42,6 +44,7 @@ import org.onlab.onos.net.flow.TrafficSelector; import org.onlab.onos.net.flow.TrafficTreatment; import org.onlab.onos.net.flow.criteria.Criterion; import org.onlab.onos.net.flow.instructions.Instruction; +import org.onlab.onos.net.intent.BatchOperation; import org.onlab.onos.net.provider.AbstractProvider; import org.onlab.onos.net.provider.ProviderId; import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore; @@ -404,6 +407,13 @@ public class FlowRuleManagerTest { public void removeRulesById(ApplicationId id, FlowRule... flowRules) { } + @Override + public Future executeBatch( + BatchOperation batch) { + // TODO Auto-generated method stub + return null; + } + } diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java index ade651ed8d..78f58748dc 100644 --- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java +++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java @@ -68,7 +68,7 @@ public class FlowModBuilder { this.cookie = flowRule.id(); } - public OFFlowMod buildFlowMod() { + public OFFlowMod buildFlowAdd() { Match match = buildMatch(); List actions = buildActions(); @@ -86,6 +86,24 @@ public class FlowModBuilder { } + public OFFlowMod buildFlowMod() { + Match match = buildMatch(); + List actions = buildActions(); + + //TODO: what to do without bufferid? do we assume that there will be a pktout as well? + OFFlowMod fm = factory.buildFlowModify() + .setCookie(U64.of(cookie.value())) + .setBufferId(OFBufferId.NO_BUFFER) + .setActions(actions) + .setMatch(match) + .setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM)) + .setPriority(priority) + .build(); + + return fm; + + } + public OFFlowMod buildFlowDel() { Match match = buildMatch(); List actions = buildActions(); diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java index eac3c186f7..0aca754a42 100644 --- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java +++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java @@ -2,8 +2,17 @@ package org.onlab.onos.provider.of.flow.impl; import static org.slf4j.LoggerFactory.getLogger; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; @@ -14,9 +23,11 @@ import org.onlab.onos.ApplicationId; import org.onlab.onos.net.DeviceId; import org.onlab.onos.net.flow.FlowEntry; import org.onlab.onos.net.flow.FlowRule; +import org.onlab.onos.net.flow.FlowRuleBatchEntry; import org.onlab.onos.net.flow.FlowRuleProvider; import org.onlab.onos.net.flow.FlowRuleProviderRegistry; import org.onlab.onos.net.flow.FlowRuleProviderService; +import org.onlab.onos.net.intent.BatchOperation; import org.onlab.onos.net.provider.AbstractProvider; import org.onlab.onos.net.provider.ProviderId; import org.onlab.onos.net.topology.TopologyService; @@ -27,6 +38,8 @@ import org.onlab.onos.openflow.controller.OpenFlowSwitch; import org.onlab.onos.openflow.controller.OpenFlowSwitchListener; import org.onlab.onos.openflow.controller.RoleState; import org.projectfloodlight.openflow.protocol.OFActionType; +import org.projectfloodlight.openflow.protocol.OFBarrierRequest; +import org.projectfloodlight.openflow.protocol.OFErrorMsg; import org.projectfloodlight.openflow.protocol.OFFlowRemoved; import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry; import org.projectfloodlight.openflow.protocol.OFFlowStatsReply; @@ -42,9 +55,11 @@ import org.projectfloodlight.openflow.protocol.action.OFActionOutput; import org.projectfloodlight.openflow.protocol.instruction.OFInstruction; import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions; import org.projectfloodlight.openflow.types.OFPort; +import org.projectfloodlight.openflow.types.U32; import org.slf4j.Logger; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -70,6 +85,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr private final InternalFlowProvider listener = new InternalFlowProvider(); + private final Map pendingFutures = + new ConcurrentHashMap(); + /** * Creates an OpenFlow host provider. */ @@ -101,7 +119,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr private void applyRule(FlowRule flowRule) { OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri())); - sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod()); + sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd()); } @@ -154,6 +172,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr @Override public void handleMessage(Dpid dpid, OFMessage msg) { + InstallationFuture future = null; switch (msg.getType()) { case FLOW_REMOVED: //TODO: make this better @@ -166,7 +185,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr pushFlowMetrics(dpid, (OFStatsReply) msg); break; case BARRIER_REPLY: + future = pendingFutures.get(msg.getXid()); + if (future != null) { + future.satisfyRequirement(dpid); + } + break; case ERROR: + future = pendingFutures.get(msg.getXid()); + if (future != null) { + future.fail((OFErrorMsg) msg, dpid); + } + break; default: log.debug("Unhandled message type: {}", msg.getType()); } @@ -226,6 +255,144 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr } + @Override + public Future executeBatch(BatchOperation batch) { + final Set sws = new HashSet(); + for (FlowRuleBatchEntry fbe : batch.getOperations()) { + FlowRule flowRule = fbe.getTarget(); + OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri())); + sws.add(new Dpid(sw.getId())); + switch (fbe.getOperator()) { + case ADD: + //TODO: Track XID for each flowmod + sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd()); + break; + case REMOVE: + //TODO: Track XID for each flowmod + sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel()); + break; + case MODIFY: + //TODO: Track XID for each flowmod + sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod()); + break; + default: + log.error("Unsupported batch operation {}", fbe.getOperator()); + } + } + InstallationFuture installation = new InstallationFuture(sws); + pendingFutures.put(U32.f(batch.hashCode()), installation); + installation.verify(batch.hashCode()); + return installation; + } + + private class InstallationFuture implements Future { + + private final Set sws; + private final AtomicBoolean ok = new AtomicBoolean(true); + private final List offendingFlowMods = Lists.newLinkedList(); + + private final CountDownLatch countDownLatch; + + public InstallationFuture(Set sws) { + this.sws = sws; + countDownLatch = new CountDownLatch(sws.size()); + } + + public void fail(OFErrorMsg msg, Dpid dpid) { + ok.set(false); + //TODO add reason to flowentry + //TODO handle specific error msgs + //offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.)); + switch (msg.getErrType()) { + case BAD_ACTION: + break; + case BAD_INSTRUCTION: + break; + case BAD_MATCH: + break; + case BAD_REQUEST: + break; + case EXPERIMENTER: + break; + case FLOW_MOD_FAILED: + break; + case GROUP_MOD_FAILED: + break; + case HELLO_FAILED: + break; + case METER_MOD_FAILED: + break; + case PORT_MOD_FAILED: + break; + case QUEUE_OP_FAILED: + break; + case ROLE_REQUEST_FAILED: + break; + case SWITCH_CONFIG_FAILED: + break; + case TABLE_FEATURES_FAILED: + break; + case TABLE_MOD_FAILED: + break; + default: + break; + + } + + } + + public void satisfyRequirement(Dpid dpid) { + log.warn("Satisfaction from switch {}", dpid); + sws.remove(controller.getSwitch(dpid)); + countDownLatch.countDown(); + } + + public void verify(Integer id) { + for (Dpid dpid : sws) { + OpenFlowSwitch sw = controller.getSwitch(dpid); + OFBarrierRequest.Builder builder = sw.factory() + .buildBarrierRequest() + .setXid(id); + sw.sendMsg(builder.build()); + } + + + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isCancelled() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isDone() { + return sws.isEmpty(); + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + countDownLatch.await(); + //return offendingFlowMods; + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, + TimeoutException { + countDownLatch.await(timeout, unit); + //return offendingFlowMods; + return null; + } + + } }