diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java index bd5c8c4a73..ed2a6785f6 100644 --- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java +++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java @@ -472,6 +472,11 @@ public class SegmentRoutingManager implements SegmentRoutingService { "staleLinkAge", "15000"); compCfgService.preSetProperty("org.onosproject.net.host.impl.HostManager", "allowDuplicateIps", "false"); + // For P4 switches + compCfgService.preSetProperty("org.onosproject.net.flow.impl.FlowRuleManager", + "fallbackFlowPollFrequency", "5"); + compCfgService.preSetProperty("org.onosproject.net.group.impl.GroupManager", + "fallbackGroupPollFrequency", "5"); compCfgService.registerProperties(getClass()); modified(context); diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java index 15c1a0d867..0dd5f21504 100644 --- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java +++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java @@ -19,12 +19,12 @@ package org.onosproject.drivers.p4runtime; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Striped; -import org.apache.commons.lang3.tuple.Pair; import org.onosproject.drivers.p4runtime.mirror.P4RuntimeGroupMirror; import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMulticastGroupMirror; import org.onosproject.drivers.p4runtime.mirror.TimedEntry; import org.onosproject.net.DeviceId; import org.onosproject.net.group.DefaultGroup; +import org.onosproject.net.group.DefaultGroupDescription; import org.onosproject.net.group.Group; import org.onosproject.net.group.GroupDescription; import org.onosproject.net.group.GroupOperation; @@ -116,17 +116,23 @@ public class P4RuntimeGroupProgrammable if (!setupBehaviour()) { return; } - groupOps.operations().stream() - // Get group type and operation type - .map(op -> Pair.of(groupStore.getGroup(deviceId, op.groupId()), - op.opType())) - .forEach(pair -> { - if (pair.getLeft().type().equals(GroupDescription.Type.ALL)) { - processMcGroupOp(deviceId, pair.getLeft(), pair.getRight()); - } else { - processGroupOp(deviceId, pair.getLeft(), pair.getRight()); - } - }); + groupOps.operations().forEach(op -> { + // ONOS-7785 We need app cookie (action profile id) from the group + Group groupOnStore = groupStore.getGroup(deviceId, op.groupId()); + GroupDescription groupDesc = new DefaultGroupDescription(deviceId, + op.groupType(), + op.buckets(), + groupOnStore.appCookie(), + op.groupId().id(), + groupOnStore.appId()); + DefaultGroup groupToApply = new DefaultGroup(op.groupId(), groupDesc); + if (op.groupType().equals(GroupDescription.Type.ALL)) { + processMcGroupOp(deviceId, groupToApply, op.opType()); + } else { + + processGroupOp(deviceId, groupToApply, op.opType()); + } + }); } @Override diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java index 363a95c055..e344404d20 100644 --- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java +++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java @@ -16,6 +16,8 @@ package org.onosproject.pipelines.fabric.pipeliner; +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalCause; @@ -24,6 +26,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.onlab.util.KryoNamespace; +import org.onlab.util.Tools; import org.onosproject.core.GroupId; import org.onosproject.net.DeviceId; import org.onosproject.net.PortNumber; @@ -56,7 +59,10 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -81,6 +87,7 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli // TODO: make this configurable private static final long DEFAULT_INSTALLATION_TIME_OUT = 40; + private static final int NUM_CALLBACK_THREAD = 2; protected DeviceId deviceId; protected FlowRuleService flowRuleService; @@ -92,7 +99,7 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli protected FabricNextPipeliner pipelinerNext; private Map pendingInstallObjectiveFlows = new ConcurrentHashMap<>(); - private Map pendingInstallObjectiveGroups = new ConcurrentHashMap<>(); + private Map pendingInstallObjectiveGroups = new ConcurrentHashMap<>(); private Cache pendingInstallObjectives = CacheBuilder.newBuilder() .expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS) .removalListener((RemovalListener) removalNotification -> { @@ -103,6 +110,8 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli } }) .build(); + private static ExecutorService flowObjCallbackExecutor = + Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log)); @Override @@ -126,8 +135,8 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli return; } - applyTranslationResult(filterObjective, result, success -> { - if (success) { + applyTranslationResult(filterObjective, result, error -> { + if (error == null) { success(filterObjective); } else { fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED); @@ -143,8 +152,8 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli return; } - applyTranslationResult(forwardObjective, result, success -> { - if (success) { + applyTranslationResult(forwardObjective, result, error -> { + if (error == null) { success(forwardObjective); } else { fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED); @@ -168,25 +177,22 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli return; } - applyTranslationResult(nextObjective, result, success -> { - if (!success) { - fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED); + applyTranslationResult(nextObjective, result, error -> { + if (error != null) { + fail(nextObjective, error); return; } // Success, put next group to objective store List portNumbers = Lists.newArrayList(); nextObjective.next().forEach(treatment -> { - Instructions.OutputInstruction outputInst = treatment.allInstructions() + treatment.allInstructions() .stream() .filter(inst -> inst.type() == Instruction.Type.OUTPUT) .map(inst -> (Instructions.OutputInstruction) inst) .findFirst() - .orElse(null); - - if (outputInst != null) { - portNumbers.add(outputInst.port()); - } + .map(Instructions.OutputInstruction::port) + .ifPresent(portNumbers::add); }); FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(), portNumbers); @@ -208,23 +214,25 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli private void applyTranslationResult(Objective objective, PipelinerTranslationResult result, - Consumer callback) { + Consumer callback) { Collection groups = result.groups(); Collection flowRules = result.flowRules(); Set flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet()); - Set groupIds = groups.stream().map(GroupDescription::givenGroupId) - .map(GroupId::new).collect(Collectors.toSet()); + Set pendingGroupKeys = groups.stream().map(GroupDescription::givenGroupId) + .map(GroupId::new) + .map(gid -> new PendingGroupKey(gid, objective.op())) + .collect(Collectors.toSet()); PendingInstallObjective pio = - new PendingInstallObjective(objective, flowIds, groupIds, callback); + new PendingInstallObjective(objective, flowIds, pendingGroupKeys, callback); flowIds.forEach(flowId -> { pendingInstallObjectiveFlows.put(flowId, pio); }); - groupIds.forEach(groupId -> { - pendingInstallObjectiveGroups.put(groupId, pio); + pendingGroupKeys.forEach(pendingGroupKey -> { + pendingInstallObjectiveGroups.put(pendingGroupKey, pio); }); pendingInstallObjectives.put(objective, pio); @@ -305,11 +313,16 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli } static void fail(Objective objective, ObjectiveError error) { - objective.context().ifPresent(ctx -> ctx.onError(objective, error)); + CompletableFuture.runAsync(() -> { + objective.context().ifPresent(ctx -> ctx.onError(objective, error)); + }, flowObjCallbackExecutor); + } static void success(Objective objective) { - objective.context().ifPresent(ctx -> ctx.onSuccess(objective)); + CompletableFuture.runAsync(() -> { + objective.context().ifPresent(ctx -> ctx.onSuccess(objective)); + }, flowObjCallbackExecutor); } static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection flowRules, @@ -317,16 +330,13 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli FlowRuleOperations.Builder ops = FlowRuleOperations.builder(); switch (objective.op()) { case ADD: + case ADD_TO_EXISTING: // For egress VLAN flowRules.forEach(ops::add); break; case REMOVE: + case REMOVE_FROM_EXISTING: // For egress VLAN flowRules.forEach(ops::remove); break; - case ADD_TO_EXISTING: - case REMOVE_FROM_EXISTING: - // Next objective may use ADD_TO_EXIST or REMOVE_FROM_EXIST op - // No need to update FlowRuls for vlan_meta table. - return null; default: log.warn("Unsupported op {} for {}", objective.op(), objective); fail(objective, ObjectiveError.BADPARAMS); @@ -362,33 +372,33 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli @Override public void event(GroupEvent event) { GroupId groupId = event.subject().id(); - PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(groupId); - if (pio == null) { - return; - } + PendingGroupKey pendingGroupKey = new PendingGroupKey(groupId, event.type()); + PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey); if (GROUP_FAILED_TYPES.contains(event.type())) { pio.failed(ObjectiveError.GROUPINSTALLATIONFAILED); } - pio.groupInstalled(groupId); + pio.groupInstalled(pendingGroupKey); } @Override public boolean isRelevant(GroupEvent event) { - return pendingInstallObjectiveGroups.containsKey(event.subject().id()); + PendingGroupKey pendingGroupKey = new PendingGroupKey(event.subject().id(), event.type()); + return pendingInstallObjectiveGroups.containsKey(pendingGroupKey); } } class PendingInstallObjective { Objective objective; Collection flowIds; - Collection groupIds; - Consumer callback; + Collection pendingGroupKeys; + Consumer callback; public PendingInstallObjective(Objective objective, Collection flowIds, - Collection groupIds, Consumer callback) { + Collection pendingGroupKeys, + Consumer callback) { this.objective = objective; this.flowIds = flowIds; - this.groupIds = groupIds; + this.pendingGroupKeys = pendingGroupKeys; this.callback = callback; } @@ -397,23 +407,79 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli checkIfFinished(); } - void groupInstalled(GroupId groupId) { - groupIds.remove(groupId); + void groupInstalled(PendingGroupKey pendingGroupKey) { + pendingGroupKeys.remove(pendingGroupKey); checkIfFinished(); } private void checkIfFinished() { - if (flowIds.isEmpty() && groupIds.isEmpty()) { + if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) { pendingInstallObjectives.invalidate(objective); - callback.accept(true); + callback.accept(null); } } void failed(ObjectiveError error) { flowIds.forEach(pendingInstallObjectiveFlows::remove); - groupIds.forEach(pendingInstallObjectiveGroups::remove); + pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove); pendingInstallObjectives.invalidate(objective); - fail(objective, error); + callback.accept(error); + } + } + + class PendingGroupKey { + private GroupId groupId; + private GroupEvent.Type expectedEventType; + + PendingGroupKey(GroupId groupId, GroupEvent.Type expectedEventType) { + this.groupId = groupId; + this.expectedEventType = expectedEventType; + } + + PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) { + this.groupId = groupId; + + switch (objOp) { + case ADD: + expectedEventType = GroupEvent.Type.GROUP_ADDED; + break; + case REMOVE: + expectedEventType = GroupEvent.Type.GROUP_REMOVED; + break; + case MODIFY: + case ADD_TO_EXISTING: + case REMOVE_FROM_EXISTING: + expectedEventType = GroupEvent.Type.GROUP_UPDATED; + break; + default: + expectedEventType = null; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PendingGroupKey pendingGroupKey = (PendingGroupKey) o; + return Objects.equal(groupId, pendingGroupKey.groupId) && + expectedEventType == pendingGroupKey.expectedEventType; + } + + @Override + public int hashCode() { + return Objects.hashCode(groupId, expectedEventType); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("groupId", groupId) + .add("expectedEventType", expectedEventType) + .toString(); } } }