diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java index ca209a355b..61d3824266 100644 --- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java +++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java @@ -18,13 +18,9 @@ package org.onosproject.pipelines.fabric.pipeliner; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalCause; -import com.google.common.cache.RemovalListener; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import com.google.common.collect.Maps; import org.onlab.util.KryoNamespace; import org.onlab.util.Tools; import org.onosproject.core.GroupId; @@ -38,7 +34,6 @@ import org.onosproject.net.driver.Driver; import org.onosproject.net.flow.FlowId; import org.onosproject.net.flow.FlowRule; import org.onosproject.net.flow.FlowRuleOperations; -import org.onosproject.net.flow.FlowRuleOperationsContext; import org.onosproject.net.flow.FlowRuleService; import org.onosproject.net.flow.instructions.Instruction; import org.onosproject.net.flow.instructions.Instructions; @@ -50,7 +45,6 @@ import org.onosproject.net.flowobjective.Objective; import org.onosproject.net.flowobjective.ObjectiveError; import org.onosproject.net.group.GroupDescription; import org.onosproject.net.group.GroupEvent; -import org.onosproject.net.group.GroupListener; import org.onosproject.net.group.GroupService; import org.onosproject.store.serializers.KryoNamespaces; import org.slf4j.Logger; @@ -63,7 +57,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -80,36 +73,20 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli .register(FabricNextGroup.class) .build("FabricPipeliner"); - private static final Set GROUP_FAILED_TYPES = - Sets.newHashSet(GroupEvent.Type.GROUP_ADD_FAILED, - GroupEvent.Type.GROUP_REMOVE_FAILED, - GroupEvent.Type.GROUP_UPDATE_FAILED); - - // TODO: make this configurable - private static final long DEFAULT_INSTALLATION_TIME_OUT = 40; private static final int NUM_CALLBACK_THREAD = 2; protected DeviceId deviceId; protected FlowRuleService flowRuleService; protected GroupService groupService; - protected GroupListener groupListener = new InternalGroupListener(); protected FlowObjectiveStore flowObjectiveStore; - protected FabricFilteringPipeliner pipelinerFilter; - protected FabricForwardingPipeliner pipelinerForward; - protected FabricNextPipeliner pipelinerNext; + FabricFilteringPipeliner pipelinerFilter; + FabricForwardingPipeliner pipelinerForward; + FabricNextPipeliner pipelinerNext; private Map pendingInstallObjectiveFlows = new ConcurrentHashMap<>(); private Map pendingInstallObjectiveGroups = new ConcurrentHashMap<>(); - private Cache pendingInstallObjectives = CacheBuilder.newBuilder() - .expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS) - .removalListener((RemovalListener) removalNotification -> { - RemovalCause cause = removalNotification.getCause(); - PendingInstallObjective pio = removalNotification.getValue(); - if (cause == RemovalCause.EXPIRED && pio != null) { - pio.failed(pio.objective, ObjectiveError.INSTALLATIONTIMEOUT); - } - }) - .build(); + private Map pendingInstallObjectives = Maps.newConcurrentMap(); + private static ExecutorService flowObjCallbackExecutor = Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log)); @@ -120,7 +97,6 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli this.deviceId = deviceId; this.flowRuleService = context.directory().get(FlowRuleService.class); this.groupService = context.directory().get(GroupService.class); - this.groupService.addListener(groupListener); this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class); this.pipelinerFilter = new FabricFilteringPipeliner(deviceId); this.pipelinerForward = new FabricForwardingPipeliner(deviceId); @@ -139,7 +115,8 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli if (error == null) { success(filterObjective); } else { - fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED); + log.info("Ignore error {}. Let flow subsystem retry", error); + success(filterObjective); } }); } @@ -156,7 +133,8 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli if (error == null) { success(forwardObjective); } else { - fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED); + log.info("Ignore error {}. Let flow subsystem retry", error); + success(forwardObjective); } }); } @@ -177,23 +155,31 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli return; } + if (nextObjective.op() == Objective.Operation.MODIFY) { + // TODO: support MODIFY operation + log.debug("Currently we don't support MODIFY operation, return failure directly to the context"); + fail(nextObjective, ObjectiveError.UNSUPPORTED); + return; + } + applyTranslationResult(nextObjective, result, error -> { if (error != null) { - fail(nextObjective, error); + log.info("Ignore error {}. Let flow/group subsystem retry", error); + success(nextObjective); return; } // Success, put next group to objective store List portNumbers = Lists.newArrayList(); - nextObjective.next().forEach(treatment -> { + nextObjective.next().forEach(treatment -> treatment.allInstructions() .stream() .filter(inst -> inst.type() == Instruction.Type.OUTPUT) .map(inst -> (Instructions.OutputInstruction) inst) .findFirst() .map(Instructions.OutputInstruction::port) - .ifPresent(portNumbers::add); - }); + .ifPresent(portNumbers::add) + ); FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(), portNumbers); flowObjectiveStore.putNextGroup(nextObjective.id(), nextGroup); @@ -232,9 +218,9 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli pendingInstallObjectiveFlows.put(pfk, pio); }); - pendingGroupKeys.forEach(pendingGroupKey -> { - pendingInstallObjectiveGroups.put(pendingGroupKey, pio); - }); + pendingGroupKeys.forEach(pendingGroupKey -> + pendingInstallObjectiveGroups.put(pendingGroupKey, pio) + ); pendingInstallObjectives.put(objective, pio); installGroups(objective, groups); @@ -246,42 +232,17 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli return; } - FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() { - @Override - public void onSuccess(FlowRuleOperations ops) { - ops.stages().forEach(stage -> { - stage.forEach(op -> { - FlowId flowId = op.rule().id(); - PendingFlowKey pfk = new PendingFlowKey(flowId, objective.id()); - PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk); + FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules); + flowRuleService.apply(ops); - if (pio != null) { - pio.flowInstalled(flowId); - } - }); - }); + flowRules.forEach(flow -> { + PendingFlowKey pfk = new PendingFlowKey(flow.id(), objective.id()); + PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk); + + if (pio != null) { + pio.flowInstalled(flow.id()); } - - @Override - public void onError(FlowRuleOperations ops) { - log.warn("Failed to install flow rules: {}", flowRules); - PendingInstallObjective pio = pendingInstallObjectives.getIfPresent(objective); - if (pio != null) { - pio.failed(objective, ObjectiveError.FLOWINSTALLATIONFAILED); - } - } - }; - - FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules, ctx); - if (ops != null) { - flowRuleService.apply(ops); - } else { - // remove pendings - flowRules.forEach(flowRule -> { - PendingFlowKey pfk = new PendingFlowKey(flowRule.id(), objective.id()); - pendingInstallObjectiveFlows.remove(pfk); - }); - } + }); } private void installGroups(Objective objective, Collection groups) { @@ -297,41 +258,40 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId())); break; case ADD_TO_EXISTING: - groups.forEach(group -> { - groupService.addBucketsToGroup(deviceId, group.appCookie(), - group.buckets(), - group.appCookie(), - group.appId()); - }); + groups.forEach(group -> groupService.addBucketsToGroup(deviceId, group.appCookie(), + group.buckets(), group.appCookie(), group.appId()) + ); break; case REMOVE_FROM_EXISTING: - groups.forEach(group -> { - groupService.removeBucketsFromGroup(deviceId, group.appCookie(), - group.buckets(), - group.appCookie(), - group.appId()); - }); + groups.forEach(group -> groupService.removeBucketsFromGroup(deviceId, group.appCookie(), + group.buckets(), group.appCookie(), group.appId()) + ); break; default: log.warn("Unsupported objective operation {}", objective.op()); + return; } - } - static void fail(Objective objective, ObjectiveError error) { - CompletableFuture.runAsync(() -> { - objective.context().ifPresent(ctx -> ctx.onError(objective, error)); - }, flowObjCallbackExecutor); + groups.forEach(group -> { + PendingGroupKey pendingGroupKey = new PendingGroupKey(new GroupId(group.givenGroupId()), objective.op()); + PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey); + pio.groupInstalled(pendingGroupKey); + }); } - static void success(Objective objective) { - CompletableFuture.runAsync(() -> { - objective.context().ifPresent(ctx -> ctx.onSuccess(objective)); - }, flowObjCallbackExecutor); + private static void fail(Objective objective, ObjectiveError error) { + CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onError(objective, error)), + flowObjCallbackExecutor); + } - static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection flowRules, - FlowRuleOperationsContext ctx) { + private static void success(Objective objective) { + CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onSuccess(objective)), + flowObjCallbackExecutor); + } + + private static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection flowRules) { FlowRuleOperations.Builder ops = FlowRuleOperations.builder(); switch (objective.op()) { case ADD: @@ -347,23 +307,23 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli fail(objective, ObjectiveError.BADPARAMS); return null; } - return ops.build(ctx); + return ops.build(); } class FabricNextGroup implements NextGroup { private NextObjective.Type type; private Collection outputPorts; - public FabricNextGroup(NextObjective.Type type, Collection outputPorts) { + FabricNextGroup(NextObjective.Type type, Collection outputPorts) { this.type = type; this.outputPorts = ImmutableList.copyOf(outputPorts); } - public NextObjective.Type type() { + NextObjective.Type type() { return type; } - public Collection outputPorts() { + Collection outputPorts() { return outputPorts; } @@ -373,32 +333,13 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli } } - class InternalGroupListener implements GroupListener { - @Override - public void event(GroupEvent event) { - GroupId groupId = event.subject().id(); - PendingGroupKey pendingGroupKey = new PendingGroupKey(groupId, event.type()); - PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey); - if (GROUP_FAILED_TYPES.contains(event.type())) { - pio.failed(pio.objective, ObjectiveError.GROUPINSTALLATIONFAILED); - } - pio.groupInstalled(pendingGroupKey); - } - - @Override - public boolean isRelevant(GroupEvent event) { - PendingGroupKey pendingGroupKey = new PendingGroupKey(event.subject().id(), event.type()); - return pendingInstallObjectiveGroups.containsKey(pendingGroupKey); - } - } - class PendingInstallObjective { Objective objective; Collection flowIds; Collection pendingGroupKeys; Consumer callback; - public PendingInstallObjective(Objective objective, Collection flowIds, + PendingInstallObjective(Objective objective, Collection flowIds, Collection pendingGroupKeys, Consumer callback) { this.objective = objective; @@ -423,7 +364,7 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli private void checkIfFinished() { if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) { - pendingInstallObjectives.invalidate(objective); + pendingInstallObjectives.remove(objective); callback.accept(null); } } @@ -434,7 +375,7 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli pendingInstallObjectiveFlows.remove(pfk); }); pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove); - pendingInstallObjectives.invalidate(objective); + pendingInstallObjectives.remove(objective); callback.accept(error); } @@ -509,11 +450,6 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli private GroupId groupId; private GroupEvent.Type expectedEventType; - PendingGroupKey(GroupId groupId, GroupEvent.Type expectedEventType) { - this.groupId = groupId; - this.expectedEventType = expectedEventType; - } - PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) { this.groupId = groupId;