fix: (vNet) reflect changes of flow objective service

To reflect changes for ONOS-6476.
Paired with core flow objective service.

Change-Id: I67c323fe6863176ac2b8ca73774d1ee7261b69c0
This commit is contained in:
Yoonseon Han 2017-05-18 15:31:09 -07:00
parent 5a1053eb46
commit ab7a1f6875

View File

@ -102,7 +102,15 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
private final PipelinerContext context = new InnerPipelineContext();
private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
// local stores for queuing fwd and next objectives that are waiting for an
// associated next objective execution to complete. The signal for completed
// execution comes from a pipeline driver, in this or another controller
// instance, via the DistributedFlowObjectiveStore.
private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
Maps.newConcurrentMap();
private final Map<Integer, Set<PendingFlowObjective>> pendingNexts =
Maps.newConcurrentMap();
// local store to track which nextObjectives were sent to which device
// for debugging purposes
@ -133,16 +141,24 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
@Override
public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
if (queueObjective(deviceId, forwardingObjective)) {
return;
if (forwardingObjective.nextId() == null ||
forwardingObjective.op() == Objective.Operation.REMOVE ||
flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
!queueFwdObjective(deviceId, forwardingObjective)) {
// fast path
executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
}
executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
}
@Override
public void next(DeviceId deviceId, NextObjective nextObjective) {
nextToDevice.put(nextObjective.id(), deviceId);
executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
if (nextObjective.op() == Objective.Operation.ADD ||
flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
!queueNextObjective(deviceId, nextObjective)) {
// either group exists or we are trying to create it - let it through
executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
}
}
@Override
@ -186,19 +202,43 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
@Override
public List<String> getPendingFlowObjectives() {
List<String> pendingNexts = new ArrayList<>();
List<String> pendingFlowObjectives = new ArrayList<>();
for (Integer nextId : pendingForwards.keySet()) {
Set<PendingNext> pnext = pendingForwards.get(nextId);
Set<PendingFlowObjective> pfwd = pendingForwards.get(nextId);
StringBuilder pend = new StringBuilder();
pend.append("Next Id: ").append(Integer.toString(nextId))
.append(" :: ");
for (PendingNext pn : pnext) {
pend.append(Integer.toString(pn.forwardingObjective().id()))
.append(" ");
pend.append("NextId: ")
.append(nextId);
for (PendingFlowObjective pf : pfwd) {
pend.append("\n FwdId: ")
.append(String.format("%11s", pf.flowObjective().id()))
.append(", DeviceId: ")
.append(pf.deviceId())
.append(", Selector: ")
.append(((ForwardingObjective) pf.flowObjective())
.selector().criteria());
}
pendingNexts.add(pend.toString());
pendingFlowObjectives.add(pend.toString());
}
return pendingNexts;
for (Integer nextId : pendingNexts.keySet()) {
Set<PendingFlowObjective> pnext = pendingNexts.get(nextId);
StringBuilder pend = new StringBuilder();
pend.append("NextId: ")
.append(nextId);
for (PendingFlowObjective pn : pnext) {
pend.append("\n NextOp: ")
.append(pn.flowObjective().op())
.append(", DeviceId: ")
.append(pn.deviceId())
.append(", Treatments: ")
.append(((NextObjective) pn.flowObjective())
.next());
}
pendingFlowObjectives.add(pend.toString());
}
return pendingFlowObjectives;
}
@Override
@ -206,23 +246,18 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
return getPendingFlowObjectives();
}
private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
if (fwd.nextId() == null ||
flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
// fast path
return false;
}
private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
boolean queued = false;
synchronized (pendingForwards) {
// double check the flow objective store, because this block could run
// after a notification arrives
if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
pendingForwards.compute(fwd.nextId(), (id, pending) -> {
PendingNext next = new PendingNext(deviceId, fwd);
PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, fwd);
if (pending == null) {
return Sets.newHashSet(next);
return Sets.newHashSet(pendfo);
} else {
pending.add(next);
pending.add(pendfo);
return pending;
}
});
@ -236,6 +271,34 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
return queued;
}
private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
// we need to hold off on other operations till we get notified that the
// initial group creation has succeeded
boolean queued = false;
synchronized (pendingNexts) {
// double check the flow objective store, because this block could run
// after a notification arrives
if (flowObjectiveStore.getNextGroup(next.id()) == null) {
pendingNexts.compute(next.id(), (id, pending) -> {
PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, next);
if (pending == null) {
return Sets.newHashSet(pendfo);
} else {
pending.add(pendfo);
return pending;
}
});
queued = true;
}
}
if (queued) {
log.debug("Queued next objective {} with operation {} meant for device {}",
next.id(), next.op(), deviceId);
}
return queued;
}
/**
* Task that passes the flow objective down to the driver. The task will
* make a few attempts to find the appropriate driver, then eventually give
@ -264,6 +327,7 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
if (pipeliner != null) {
if (objective instanceof NextObjective) {
nextToDevice.put(objective.id(), deviceId);
pipeliner.next((NextObjective) objective);
} else if (objective instanceof ForwardingObjective) {
pipeliner.forward((ForwardingObjective) objective);
@ -280,7 +344,7 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
objective.context().ifPresent(
c -> c.onError(objective, ObjectiveError.NOPIPELINER));
}
//Excpetion thrown
//Exception thrown
} catch (Exception e) {
log.warn("Exception while installing flow objective", e);
}
@ -292,21 +356,37 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
public void notify(ObjectiveEvent event) {
if (event.type() == ObjectiveEvent.Type.ADD) {
log.debug("Received notification of obj event {}", event);
Set<PendingNext> pending;
Set<PendingFlowObjective> pending;
// first send all pending flows
synchronized (pendingForwards) {
// needs to be synchronized for queueObjective lookup
pending = pendingForwards.remove(event.subject());
}
if (pending == null) {
log.debug("Nothing pending for this obj event {}", event);
return;
log.debug("No forwarding objectives pending for this "
+ "obj event {}", event);
} else {
log.debug("Processing {} pending forwarding objectives for nextId {}",
pending.size(), event.subject());
pending.forEach(p -> getDevicePipeliner(p.deviceId())
.forward((ForwardingObjective) p.flowObjective()));
}
log.debug("Processing {} pending forwarding objectives for nextId {}",
pending.size(), event.subject());
pending.forEach(p -> getDevicePipeliner(p.deviceId())
.forward(p.forwardingObjective()));
// now check for pending next-objectives
synchronized (pendingNexts) {
// needs to be synchronized for queueObjective lookup
pending = pendingNexts.remove(event.subject());
}
if (pending == null) {
log.debug("No next objectives pending for this "
+ "obj event {}", event);
} else {
log.debug("Processing {} pending next objectives for nextId {}",
pending.size(), event.subject());
pending.forEach(p -> getDevicePipeliner(p.deviceId())
.next((NextObjective) p.flowObjective()));
}
}
}
}
@ -353,29 +433,34 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
}
/**
* Data class used to hold a pending forwarding objective that could not
* Data class used to hold a pending flow objective that could not
* be processed because the associated next object was not present.
* Note that this pending flow objective could be a forwarding objective
* waiting for a next objective to complete execution. Or it could a
* next objective (with a different operation - remove, addToExisting, or
* removeFromExisting) waiting for a next objective with the same id to
* complete execution.
*/
private class PendingNext {
private class PendingFlowObjective {
private final DeviceId deviceId;
private final ForwardingObjective fwd;
private final Objective flowObj;
public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
this.deviceId = deviceId;
this.fwd = fwd;
this.flowObj = flowObj;
}
public DeviceId deviceId() {
return deviceId;
}
public ForwardingObjective forwardingObjective() {
return fwd;
public Objective flowObjective() {
return flowObj;
}
@Override
public int hashCode() {
return Objects.hash(deviceId, fwd);
return Objects.hash(deviceId, flowObj);
}
@Override
@ -383,12 +468,12 @@ public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
if (this == obj) {
return true;
}
if (!(obj instanceof PendingNext)) {
if (!(obj instanceof PendingFlowObjective)) {
return false;
}
final PendingNext other = (PendingNext) obj;
final PendingFlowObjective other = (PendingFlowObjective) obj;
if (this.deviceId.equals(other.deviceId) &&
this.fwd.equals(other.fwd)) {
this.flowObj.equals(other.flowObj)) {
return true;
}
return false;