diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java index ebc1da4dd8..8083c2b9c0 100644 --- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java +++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java @@ -93,14 +93,14 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp private ServiceDirectory serviceDirectory; protected FlowRuleService flowRuleService; private CoreService coreService; - private GroupService groupService; + protected GroupService groupService; protected MeterService meterService; - private FlowObjectiveStore flowObjectiveStore; + protected FlowObjectiveStore flowObjectiveStore; protected DeviceId deviceId; protected ApplicationId appId; protected DeviceService deviceService; - private KryoNamespace appKryo = new KryoNamespace.Builder() + protected KryoNamespace appKryo = new KryoNamespace.Builder() .register(GroupKey.class) .register(DefaultGroupKey.class) .register(CorsaGroup.class) @@ -108,6 +108,8 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp .build("AbstractCorsaPipeline"); private Cache pendingGroups; + protected Cache pendingNext; + private ScheduledExecutorService groupChecker = Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner", @@ -131,6 +133,16 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp } }).build(); + pendingNext = CacheBuilder.newBuilder() + .expireAfterWrite(20, TimeUnit.SECONDS) + .removalListener((RemovalNotification notification) -> { + if (notification.getCause() == RemovalCause.EXPIRED) { + notification.getValue().context() + .ifPresent(c -> c.onError(notification.getValue(), + ObjectiveError.FLOWINSTALLATIONFAILED)); + } + }).build(); + groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS); coreService = serviceDirectory.get(CoreService.class); @@ -304,6 +316,7 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp @Override public void forward(ForwardingObjective fwd) { + Collection rules; FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder(); @@ -354,16 +367,20 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp private Collection processSpecific(ForwardingObjective fwd) { log.debug("Processing specific forwarding objective"); TrafficSelector selector = fwd.selector(); - EthTypeCriterion ethType = + EthTypeCriterion ethTypeCriterion = (EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE); - if (ethType != null) { - short et = ethType.ethType().toShort(); + VlanIdCriterion vlanIdCriterion = + (VlanIdCriterion) selector.getCriterion(Criterion.Type.VLAN_VID); + if (ethTypeCriterion != null) { + short et = ethTypeCriterion.ethType().toShort(); if (et == Ethernet.TYPE_IPV4) { return processSpecificRoute(fwd); } else if (et == Ethernet.TYPE_VLAN) { /* The ForwardingObjective must specify VLAN ethtype in order to use the Transit Circuit */ return processSpecificSwitch(fwd); } + } else if (vlanIdCriterion != null) { + return processSpecificSwitch(fwd); } fail(fwd, ObjectiveError.UNSUPPORTED); @@ -464,6 +481,41 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp //Hook for modifying Route flow rule protected abstract Builder processSpecificRoutingRule(Builder rb); + protected enum CorsaTrafficTreatmentType { + /** + * If the treatment has to be handled as group. + */ + GROUP, + /** + * If the treatment has to be handled as simple set of actions. + */ + ACTIONS + } + + /** + * Helper class to encapsulate both traffic treatment and + * type of treatment. + */ + protected class CorsaTrafficTreatment { + + private CorsaTrafficTreatmentType type; + private TrafficTreatment trafficTreatment; + + public CorsaTrafficTreatment(CorsaTrafficTreatmentType treatmentType, TrafficTreatment trafficTreatment) { + this.type = treatmentType; + this.trafficTreatment = trafficTreatment; + } + + public CorsaTrafficTreatmentType type() { + return type; + } + + public TrafficTreatment treatment() { + return trafficTreatment; + } + + } + @Override public void next(NextObjective nextObjective) { switch (nextObjective.type()) { @@ -471,20 +523,25 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp Collection treatments = nextObjective.next(); if (treatments.size() == 1) { TrafficTreatment treatment = treatments.iterator().next(); - treatment = processNextTreatment(treatment); - GroupBucket bucket = - DefaultGroupBucket.createIndirectGroupBucket(treatment); + CorsaTrafficTreatment corsaTreatment = processNextTreatment(treatment); final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id())); - GroupDescription groupDescription - = new DefaultGroupDescription(deviceId, - GroupDescription.Type.INDIRECT, - new GroupBuckets(Collections - .singletonList(bucket)), - key, - null, // let group service determine group id - nextObjective.appId()); - groupService.addGroup(groupDescription); - pendingGroups.put(key, nextObjective); + if (corsaTreatment.type() == CorsaTrafficTreatmentType.GROUP) { + GroupBucket bucket = DefaultGroupBucket.createIndirectGroupBucket(corsaTreatment.treatment()); + GroupBuckets buckets = new GroupBuckets(Collections.singletonList(bucket)); + // group id == null, let group service determine group id + GroupDescription groupDescription = new DefaultGroupDescription(deviceId, + GroupDescription.Type.INDIRECT, + buckets, + key, + null, + nextObjective.appId()); + groupService.addGroup(groupDescription); + pendingGroups.put(key, nextObjective); + } else if (corsaTreatment.type() == CorsaTrafficTreatmentType.ACTIONS) { + pendingNext.put(nextObjective.id(), nextObjective); + flowObjectiveStore.putNextGroup(nextObjective.id(), new CorsaGroup(key)); + nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective)); + } } break; case HASHED: @@ -501,8 +558,8 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp } //Hook for altering the NextObjective treatment - protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) { - return treatment; + protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) { + return new CorsaTrafficTreatment(CorsaTrafficTreatmentType.GROUP, treatment); } //Init helper: Table Miss = Drop diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java index 10e3f0b1aa..b1e15b2e34 100644 --- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java +++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java @@ -30,9 +30,12 @@ import org.onosproject.net.flow.criteria.EthCriterion; import org.onosproject.net.flow.criteria.IPCriterion; import org.onosproject.net.flow.criteria.PortCriterion; import org.onosproject.net.flow.criteria.VlanIdCriterion; +import org.onosproject.net.flow.instructions.Instruction; import org.onosproject.net.flow.instructions.L2ModificationInstruction; import org.onosproject.net.flowobjective.FilteringObjective; import org.onosproject.net.flowobjective.ForwardingObjective; +import org.onosproject.net.flowobjective.NextObjective; +import org.onosproject.net.flowobjective.ObjectiveError; import org.onosproject.net.meter.Band; import org.onosproject.net.meter.DefaultBand; import org.onosproject.net.meter.DefaultMeterRequest; @@ -69,9 +72,11 @@ public class CorsaPipelineV3 extends AbstractCorsaPipeline { protected MeterId defaultMeterId = null; @Override - protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) { + protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) { TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder(); + + treatment.immediate().stream() .filter(i -> { switch (i.type()) { @@ -87,7 +92,48 @@ public class CorsaPipelineV3 extends AbstractCorsaPipeline { return false; } }).forEach(i -> tb.add(i)); - return tb.build(); + + TrafficTreatment t = tb.build(); + + + boolean isPresentModVlanId = false; + boolean isPresentModEthSrc = false; + boolean isPresentModEthDst = false; + boolean isPresentOutpuPort = false; + + for (Instruction instruction : t.immediate()) { + switch (instruction.type()) { + case L2MODIFICATION: + L2ModificationInstruction l2i = (L2ModificationInstruction) instruction; + if (l2i instanceof L2ModificationInstruction.ModVlanIdInstruction) { + isPresentModVlanId = true; + } + + if (l2i instanceof L2ModificationInstruction.ModEtherInstruction) { + L2ModificationInstruction.L2SubType subType = l2i.subtype(); + if (subType.equals(L2ModificationInstruction.L2SubType.ETH_SRC)) { + isPresentModEthSrc = true; + } else if (subType.equals(L2ModificationInstruction.L2SubType.ETH_DST)) { + isPresentModEthDst = true; + } + } + case OUTPUT: + isPresentOutpuPort = true; + default: + } + } + CorsaTrafficTreatmentType type = CorsaTrafficTreatmentType.ACTIONS; + /** + * This represents the allowed group for CorsaPipelinev3 + */ + if (isPresentModVlanId && + isPresentModEthSrc && + isPresentModEthDst && + isPresentOutpuPort) { + type = CorsaTrafficTreatmentType.GROUP; + } + CorsaTrafficTreatment corsaTreatment = new CorsaTrafficTreatment(type, t); + return corsaTreatment; } @Override @@ -115,9 +161,37 @@ public class CorsaPipelineV3 extends AbstractCorsaPipeline { .withPriority(fwd.priority()) .forDevice(deviceId) .withSelector(filteredSelector) - .withTreatment(fwd.treatment()) .forTable(VLAN_CIRCUIT_TABLE); + if (fwd.treatment() != null) { + ruleBuilder.withTreatment(fwd.treatment()); + } else { + if (fwd.nextId() != null) { + NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId()); + if (nextObjective != null) { + pendingNext.invalidate(fwd.nextId()); + TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder() + .setVlanPcp((byte) 0) + .setQueue(0) + .meter(defaultMeterId); + nextObjective.next().forEach(trafficTreatment -> { + trafficTreatment.allInstructions().forEach(instruction -> { + treatment.add(instruction); + }); + }); + ruleBuilder.withTreatment(treatment.build()); + } else { + log.warn("The group left!"); + fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING)); + return ImmutableSet.of(); + } + } else { + log.warn("Missing NextObjective ID for ForwardingObjective {}", fwd.id()); + fail(fwd, ObjectiveError.BADPARAMS); + return ImmutableSet.of(); + } + } + if (fwd.permanent()) { ruleBuilder.makePermanent(); } else { diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java index 9979cc101a..514ac78e31 100644 --- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java +++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java @@ -29,6 +29,7 @@ import org.onosproject.net.flow.TrafficTreatment; import org.onosproject.net.flow.criteria.Criterion; import org.onosproject.net.flow.criteria.IPCriterion; import org.onosproject.net.flow.criteria.IPProtocolCriterion; +import org.onosproject.net.flow.instructions.Instruction; import org.onosproject.net.flow.instructions.Instructions; import org.onosproject.net.flow.instructions.L2ModificationInstruction; import org.onosproject.net.flowobjective.ForwardingObjective; @@ -226,16 +227,14 @@ public class CorsaPipelineV39 extends CorsaPipelineV3 { } @Override - protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) { + protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) { TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder(); - tb.add(Instructions.popVlan()); treatment.immediate().stream() .filter(i -> { switch (i.type()) { case L2MODIFICATION: L2ModificationInstruction l2i = (L2ModificationInstruction) i; if (l2i.subtype() == VLAN_ID || - l2i.subtype() == VLAN_POP || l2i.subtype() == VLAN_POP || l2i.subtype() == ETH_DST || l2i.subtype() == ETH_SRC) { @@ -247,6 +246,51 @@ public class CorsaPipelineV39 extends CorsaPipelineV3 { return false; } }).forEach(i -> tb.add(i)); - return tb.build(); + + TrafficTreatment t = tb.build(); + + boolean isPresentModVlanId = false; + boolean isPresentModEthSrc = false; + boolean isPresentModEthDst = false; + boolean isPresentOutpuPort = false; + + for (Instruction instruction : t.immediate()) { + switch (instruction.type()) { + case L2MODIFICATION: + L2ModificationInstruction l2i = (L2ModificationInstruction) instruction; + if (l2i instanceof L2ModificationInstruction.ModVlanIdInstruction) { + isPresentModVlanId = true; + } + + if (l2i instanceof L2ModificationInstruction.ModEtherInstruction) { + L2ModificationInstruction.L2SubType subType = l2i.subtype(); + if (subType.equals(L2ModificationInstruction.L2SubType.ETH_SRC)) { + isPresentModEthSrc = true; + } else if (subType.equals(L2ModificationInstruction.L2SubType.ETH_DST)) { + isPresentModEthDst = true; + } + } + case OUTPUT: + isPresentOutpuPort = true; + default: + } + } + CorsaTrafficTreatmentType type = CorsaTrafficTreatmentType.ACTIONS; + /** + * These are the allowed groups for CorsaPipelinev39 + */ + if (isPresentModVlanId && isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) { + type = CorsaTrafficTreatmentType.GROUP; + + } else if ((!isPresentModVlanId && isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) || + (!isPresentModVlanId && !isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) || + (!isPresentModVlanId && !isPresentModEthSrc && !isPresentModEthDst && isPresentOutpuPort)) { + type = CorsaTrafficTreatmentType.GROUP; + TrafficTreatment.Builder tb2 = DefaultTrafficTreatment.builder(t); + tb2.add(Instructions.popVlan()); + t = tb2.build(); + } + CorsaTrafficTreatment corsaTreatment = new CorsaTrafficTreatment(type, t); + return corsaTreatment; } }