From ab7a1f6875c284f00677723d593be28cff2dcedc Mon Sep 17 00:00:00 2001 From: Yoonseon Han Date: Thu, 18 May 2017 15:31:09 -0700 Subject: [PATCH] fix: (vNet) reflect changes of flow objective service To reflect changes for ONOS-6476. Paired with core flow objective service. Change-Id: I67c323fe6863176ac2b8ca73774d1ee7261b69c0 --- .../VirtualNetworkFlowObjectiveManager.java | 171 +++++++++++++----- 1 file changed, 128 insertions(+), 43 deletions(-) diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java index b28a34e27c..36b286d518 100644 --- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java +++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java @@ -102,7 +102,15 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService private final PipelinerContext context = new InnerPipelineContext(); private final Map pipeliners = Maps.newConcurrentMap(); - private final Map> pendingForwards = Maps.newConcurrentMap(); + + // local stores for queuing fwd and next objectives that are waiting for an + // associated next objective execution to complete. The signal for completed + // execution comes from a pipeline driver, in this or another controller + // instance, via the DistributedFlowObjectiveStore. + private final Map> pendingForwards = + Maps.newConcurrentMap(); + private final Map> pendingNexts = + Maps.newConcurrentMap(); // local store to track which nextObjectives were sent to which device // for debugging purposes @@ -133,16 +141,24 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService @Override public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) { - if (queueObjective(deviceId, forwardingObjective)) { - return; + if (forwardingObjective.nextId() == null || + forwardingObjective.op() == Objective.Operation.REMOVE || + flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null || + !queueFwdObjective(deviceId, forwardingObjective)) { + // fast path + executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective)); } - executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective)); } @Override public void next(DeviceId deviceId, NextObjective nextObjective) { nextToDevice.put(nextObjective.id(), deviceId); - executorService.execute(new ObjectiveInstaller(deviceId, nextObjective)); + if (nextObjective.op() == Objective.Operation.ADD || + flowObjectiveStore.getNextGroup(nextObjective.id()) != null || + !queueNextObjective(deviceId, nextObjective)) { + // either group exists or we are trying to create it - let it through + executorService.execute(new ObjectiveInstaller(deviceId, nextObjective)); + } } @Override @@ -186,19 +202,43 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService @Override public List getPendingFlowObjectives() { - List pendingNexts = new ArrayList<>(); + List pendingFlowObjectives = new ArrayList<>(); + for (Integer nextId : pendingForwards.keySet()) { - Set pnext = pendingForwards.get(nextId); + Set pfwd = pendingForwards.get(nextId); StringBuilder pend = new StringBuilder(); - pend.append("Next Id: ").append(Integer.toString(nextId)) - .append(" :: "); - for (PendingNext pn : pnext) { - pend.append(Integer.toString(pn.forwardingObjective().id())) - .append(" "); + pend.append("NextId: ") + .append(nextId); + for (PendingFlowObjective pf : pfwd) { + pend.append("\n FwdId: ") + .append(String.format("%11s", pf.flowObjective().id())) + .append(", DeviceId: ") + .append(pf.deviceId()) + .append(", Selector: ") + .append(((ForwardingObjective) pf.flowObjective()) + .selector().criteria()); } - pendingNexts.add(pend.toString()); + pendingFlowObjectives.add(pend.toString()); } - return pendingNexts; + + for (Integer nextId : pendingNexts.keySet()) { + Set pnext = pendingNexts.get(nextId); + StringBuilder pend = new StringBuilder(); + pend.append("NextId: ") + .append(nextId); + for (PendingFlowObjective pn : pnext) { + pend.append("\n NextOp: ") + .append(pn.flowObjective().op()) + .append(", DeviceId: ") + .append(pn.deviceId()) + .append(", Treatments: ") + .append(((NextObjective) pn.flowObjective()) + .next()); + } + pendingFlowObjectives.add(pend.toString()); + } + + return pendingFlowObjectives; } @Override @@ -206,23 +246,18 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService return getPendingFlowObjectives(); } - private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) { - if (fwd.nextId() == null || - flowObjectiveStore.getNextGroup(fwd.nextId()) != null) { - // fast path - return false; - } + private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) { boolean queued = false; synchronized (pendingForwards) { // double check the flow objective store, because this block could run // after a notification arrives if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) { pendingForwards.compute(fwd.nextId(), (id, pending) -> { - PendingNext next = new PendingNext(deviceId, fwd); + PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, fwd); if (pending == null) { - return Sets.newHashSet(next); + return Sets.newHashSet(pendfo); } else { - pending.add(next); + pending.add(pendfo); return pending; } }); @@ -236,6 +271,34 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService return queued; } + private boolean queueNextObjective(DeviceId deviceId, NextObjective next) { + + // we need to hold off on other operations till we get notified that the + // initial group creation has succeeded + boolean queued = false; + synchronized (pendingNexts) { + // double check the flow objective store, because this block could run + // after a notification arrives + if (flowObjectiveStore.getNextGroup(next.id()) == null) { + pendingNexts.compute(next.id(), (id, pending) -> { + PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, next); + if (pending == null) { + return Sets.newHashSet(pendfo); + } else { + pending.add(pendfo); + return pending; + } + }); + queued = true; + } + } + if (queued) { + log.debug("Queued next objective {} with operation {} meant for device {}", + next.id(), next.op(), deviceId); + } + return queued; + } + /** * Task that passes the flow objective down to the driver. The task will * make a few attempts to find the appropriate driver, then eventually give @@ -264,6 +327,7 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService if (pipeliner != null) { if (objective instanceof NextObjective) { + nextToDevice.put(objective.id(), deviceId); pipeliner.next((NextObjective) objective); } else if (objective instanceof ForwardingObjective) { pipeliner.forward((ForwardingObjective) objective); @@ -280,7 +344,7 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService objective.context().ifPresent( c -> c.onError(objective, ObjectiveError.NOPIPELINER)); } - //Excpetion thrown + //Exception thrown } catch (Exception e) { log.warn("Exception while installing flow objective", e); } @@ -292,21 +356,37 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService public void notify(ObjectiveEvent event) { if (event.type() == ObjectiveEvent.Type.ADD) { log.debug("Received notification of obj event {}", event); - Set pending; + Set pending; + + // first send all pending flows synchronized (pendingForwards) { // needs to be synchronized for queueObjective lookup pending = pendingForwards.remove(event.subject()); } - if (pending == null) { - log.debug("Nothing pending for this obj event {}", event); - return; + log.debug("No forwarding objectives pending for this " + + "obj event {}", event); + } else { + log.debug("Processing {} pending forwarding objectives for nextId {}", + pending.size(), event.subject()); + pending.forEach(p -> getDevicePipeliner(p.deviceId()) + .forward((ForwardingObjective) p.flowObjective())); } - log.debug("Processing {} pending forwarding objectives for nextId {}", - pending.size(), event.subject()); - pending.forEach(p -> getDevicePipeliner(p.deviceId()) - .forward(p.forwardingObjective())); + // now check for pending next-objectives + synchronized (pendingNexts) { + // needs to be synchronized for queueObjective lookup + pending = pendingNexts.remove(event.subject()); + } + if (pending == null) { + log.debug("No next objectives pending for this " + + "obj event {}", event); + } else { + log.debug("Processing {} pending next objectives for nextId {}", + pending.size(), event.subject()); + pending.forEach(p -> getDevicePipeliner(p.deviceId()) + .next((NextObjective) p.flowObjective())); + } } } } @@ -353,29 +433,34 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService } /** - * Data class used to hold a pending forwarding objective that could not + * Data class used to hold a pending flow objective that could not * be processed because the associated next object was not present. + * Note that this pending flow objective could be a forwarding objective + * waiting for a next objective to complete execution. Or it could a + * next objective (with a different operation - remove, addToExisting, or + * removeFromExisting) waiting for a next objective with the same id to + * complete execution. */ - private class PendingNext { + private class PendingFlowObjective { private final DeviceId deviceId; - private final ForwardingObjective fwd; + private final Objective flowObj; - public PendingNext(DeviceId deviceId, ForwardingObjective fwd) { + public PendingFlowObjective(DeviceId deviceId, Objective flowObj) { this.deviceId = deviceId; - this.fwd = fwd; + this.flowObj = flowObj; } public DeviceId deviceId() { return deviceId; } - public ForwardingObjective forwardingObjective() { - return fwd; + public Objective flowObjective() { + return flowObj; } @Override public int hashCode() { - return Objects.hash(deviceId, fwd); + return Objects.hash(deviceId, flowObj); } @Override @@ -383,12 +468,12 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService if (this == obj) { return true; } - if (!(obj instanceof PendingNext)) { + if (!(obj instanceof PendingFlowObjective)) { return false; } - final PendingNext other = (PendingNext) obj; + final PendingFlowObjective other = (PendingFlowObjective) obj; if (this.deviceId.equals(other.deviceId) && - this.fwd.equals(other.fwd)) { + this.flowObj.equals(other.flowObj)) { return true; } return false;