mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-10-13 16:31:00 +02:00
Report successful objective operation immediately once the flows/groups are translated
P4 pipeliners are fundamentally different from OpenFlow pipeliners. It can verify if the flow objective can be translated into a flow that is compatible with the pipeline. Once verified, it is almost certain the flow can be installed on the switch. The flow/group subsystem retry mechanism will take care of some rare cases where the flow/group doesn’t get in for the first time. Therefore, we only fail the objective if there is a translation error in the FabricPipeliner Change-Id: I868016c0859930fa15b9cdbacb6f72d8c3df307f
This commit is contained in:
parent
76e6386d41
commit
91ea972a04
@ -18,13 +18,9 @@ package org.onosproject.pipelines.fabric.pipeliner;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.RemovalCause;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.onlab.util.KryoNamespace;
|
||||
import org.onlab.util.Tools;
|
||||
import org.onosproject.core.GroupId;
|
||||
@ -38,7 +34,6 @@ import org.onosproject.net.driver.Driver;
|
||||
import org.onosproject.net.flow.FlowId;
|
||||
import org.onosproject.net.flow.FlowRule;
|
||||
import org.onosproject.net.flow.FlowRuleOperations;
|
||||
import org.onosproject.net.flow.FlowRuleOperationsContext;
|
||||
import org.onosproject.net.flow.FlowRuleService;
|
||||
import org.onosproject.net.flow.instructions.Instruction;
|
||||
import org.onosproject.net.flow.instructions.Instructions;
|
||||
@ -50,7 +45,6 @@ import org.onosproject.net.flowobjective.Objective;
|
||||
import org.onosproject.net.flowobjective.ObjectiveError;
|
||||
import org.onosproject.net.group.GroupDescription;
|
||||
import org.onosproject.net.group.GroupEvent;
|
||||
import org.onosproject.net.group.GroupListener;
|
||||
import org.onosproject.net.group.GroupService;
|
||||
import org.onosproject.store.serializers.KryoNamespaces;
|
||||
import org.slf4j.Logger;
|
||||
@ -63,7 +57,6 @@ import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -80,36 +73,20 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
.register(FabricNextGroup.class)
|
||||
.build("FabricPipeliner");
|
||||
|
||||
private static final Set<GroupEvent.Type> GROUP_FAILED_TYPES =
|
||||
Sets.newHashSet(GroupEvent.Type.GROUP_ADD_FAILED,
|
||||
GroupEvent.Type.GROUP_REMOVE_FAILED,
|
||||
GroupEvent.Type.GROUP_UPDATE_FAILED);
|
||||
|
||||
// TODO: make this configurable
|
||||
private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
|
||||
private static final int NUM_CALLBACK_THREAD = 2;
|
||||
|
||||
protected DeviceId deviceId;
|
||||
protected FlowRuleService flowRuleService;
|
||||
protected GroupService groupService;
|
||||
protected GroupListener groupListener = new InternalGroupListener();
|
||||
protected FlowObjectiveStore flowObjectiveStore;
|
||||
protected FabricFilteringPipeliner pipelinerFilter;
|
||||
protected FabricForwardingPipeliner pipelinerForward;
|
||||
protected FabricNextPipeliner pipelinerNext;
|
||||
FabricFilteringPipeliner pipelinerFilter;
|
||||
FabricForwardingPipeliner pipelinerForward;
|
||||
FabricNextPipeliner pipelinerNext;
|
||||
|
||||
private Map<PendingFlowKey, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
|
||||
private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
|
||||
private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
|
||||
.removalListener((RemovalListener<Objective, PendingInstallObjective>) removalNotification -> {
|
||||
RemovalCause cause = removalNotification.getCause();
|
||||
PendingInstallObjective pio = removalNotification.getValue();
|
||||
if (cause == RemovalCause.EXPIRED && pio != null) {
|
||||
pio.failed(pio.objective, ObjectiveError.INSTALLATIONTIMEOUT);
|
||||
}
|
||||
})
|
||||
.build();
|
||||
private Map<Objective, PendingInstallObjective> pendingInstallObjectives = Maps.newConcurrentMap();
|
||||
|
||||
private static ExecutorService flowObjCallbackExecutor =
|
||||
Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log));
|
||||
|
||||
@ -120,7 +97,6 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
this.deviceId = deviceId;
|
||||
this.flowRuleService = context.directory().get(FlowRuleService.class);
|
||||
this.groupService = context.directory().get(GroupService.class);
|
||||
this.groupService.addListener(groupListener);
|
||||
this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
|
||||
this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
|
||||
this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
|
||||
@ -139,7 +115,8 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
if (error == null) {
|
||||
success(filterObjective);
|
||||
} else {
|
||||
fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
|
||||
log.info("Ignore error {}. Let flow subsystem retry", error);
|
||||
success(filterObjective);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -156,7 +133,8 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
if (error == null) {
|
||||
success(forwardObjective);
|
||||
} else {
|
||||
fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
|
||||
log.info("Ignore error {}. Let flow subsystem retry", error);
|
||||
success(forwardObjective);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -177,23 +155,31 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
return;
|
||||
}
|
||||
|
||||
if (nextObjective.op() == Objective.Operation.MODIFY) {
|
||||
// TODO: support MODIFY operation
|
||||
log.debug("Currently we don't support MODIFY operation, return failure directly to the context");
|
||||
fail(nextObjective, ObjectiveError.UNSUPPORTED);
|
||||
return;
|
||||
}
|
||||
|
||||
applyTranslationResult(nextObjective, result, error -> {
|
||||
if (error != null) {
|
||||
fail(nextObjective, error);
|
||||
log.info("Ignore error {}. Let flow/group subsystem retry", error);
|
||||
success(nextObjective);
|
||||
return;
|
||||
}
|
||||
|
||||
// Success, put next group to objective store
|
||||
List<PortNumber> portNumbers = Lists.newArrayList();
|
||||
nextObjective.next().forEach(treatment -> {
|
||||
nextObjective.next().forEach(treatment ->
|
||||
treatment.allInstructions()
|
||||
.stream()
|
||||
.filter(inst -> inst.type() == Instruction.Type.OUTPUT)
|
||||
.map(inst -> (Instructions.OutputInstruction) inst)
|
||||
.findFirst()
|
||||
.map(Instructions.OutputInstruction::port)
|
||||
.ifPresent(portNumbers::add);
|
||||
});
|
||||
.ifPresent(portNumbers::add)
|
||||
);
|
||||
FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
|
||||
portNumbers);
|
||||
flowObjectiveStore.putNextGroup(nextObjective.id(), nextGroup);
|
||||
@ -232,9 +218,9 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
pendingInstallObjectiveFlows.put(pfk, pio);
|
||||
});
|
||||
|
||||
pendingGroupKeys.forEach(pendingGroupKey -> {
|
||||
pendingInstallObjectiveGroups.put(pendingGroupKey, pio);
|
||||
});
|
||||
pendingGroupKeys.forEach(pendingGroupKey ->
|
||||
pendingInstallObjectiveGroups.put(pendingGroupKey, pio)
|
||||
);
|
||||
|
||||
pendingInstallObjectives.put(objective, pio);
|
||||
installGroups(objective, groups);
|
||||
@ -246,42 +232,17 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
return;
|
||||
}
|
||||
|
||||
FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
|
||||
@Override
|
||||
public void onSuccess(FlowRuleOperations ops) {
|
||||
ops.stages().forEach(stage -> {
|
||||
stage.forEach(op -> {
|
||||
FlowId flowId = op.rule().id();
|
||||
PendingFlowKey pfk = new PendingFlowKey(flowId, objective.id());
|
||||
PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk);
|
||||
FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules);
|
||||
flowRuleService.apply(ops);
|
||||
|
||||
if (pio != null) {
|
||||
pio.flowInstalled(flowId);
|
||||
}
|
||||
});
|
||||
});
|
||||
flowRules.forEach(flow -> {
|
||||
PendingFlowKey pfk = new PendingFlowKey(flow.id(), objective.id());
|
||||
PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk);
|
||||
|
||||
if (pio != null) {
|
||||
pio.flowInstalled(flow.id());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(FlowRuleOperations ops) {
|
||||
log.warn("Failed to install flow rules: {}", flowRules);
|
||||
PendingInstallObjective pio = pendingInstallObjectives.getIfPresent(objective);
|
||||
if (pio != null) {
|
||||
pio.failed(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules, ctx);
|
||||
if (ops != null) {
|
||||
flowRuleService.apply(ops);
|
||||
} else {
|
||||
// remove pendings
|
||||
flowRules.forEach(flowRule -> {
|
||||
PendingFlowKey pfk = new PendingFlowKey(flowRule.id(), objective.id());
|
||||
pendingInstallObjectiveFlows.remove(pfk);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void installGroups(Objective objective, Collection<GroupDescription> groups) {
|
||||
@ -297,41 +258,40 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
|
||||
break;
|
||||
case ADD_TO_EXISTING:
|
||||
groups.forEach(group -> {
|
||||
groupService.addBucketsToGroup(deviceId, group.appCookie(),
|
||||
group.buckets(),
|
||||
group.appCookie(),
|
||||
group.appId());
|
||||
});
|
||||
groups.forEach(group -> groupService.addBucketsToGroup(deviceId, group.appCookie(),
|
||||
group.buckets(), group.appCookie(), group.appId())
|
||||
);
|
||||
break;
|
||||
case REMOVE_FROM_EXISTING:
|
||||
groups.forEach(group -> {
|
||||
groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
|
||||
group.buckets(),
|
||||
group.appCookie(),
|
||||
group.appId());
|
||||
});
|
||||
groups.forEach(group -> groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
|
||||
group.buckets(), group.appCookie(), group.appId())
|
||||
);
|
||||
break;
|
||||
default:
|
||||
log.warn("Unsupported objective operation {}", objective.op());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
static void fail(Objective objective, ObjectiveError error) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
objective.context().ifPresent(ctx -> ctx.onError(objective, error));
|
||||
}, flowObjCallbackExecutor);
|
||||
groups.forEach(group -> {
|
||||
PendingGroupKey pendingGroupKey = new PendingGroupKey(new GroupId(group.givenGroupId()), objective.op());
|
||||
PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
|
||||
pio.groupInstalled(pendingGroupKey);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
static void success(Objective objective) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
|
||||
}, flowObjCallbackExecutor);
|
||||
private static void fail(Objective objective, ObjectiveError error) {
|
||||
CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onError(objective, error)),
|
||||
flowObjCallbackExecutor);
|
||||
|
||||
}
|
||||
|
||||
static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules,
|
||||
FlowRuleOperationsContext ctx) {
|
||||
private static void success(Objective objective) {
|
||||
CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onSuccess(objective)),
|
||||
flowObjCallbackExecutor);
|
||||
}
|
||||
|
||||
private static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules) {
|
||||
FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
|
||||
switch (objective.op()) {
|
||||
case ADD:
|
||||
@ -347,23 +307,23 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
fail(objective, ObjectiveError.BADPARAMS);
|
||||
return null;
|
||||
}
|
||||
return ops.build(ctx);
|
||||
return ops.build();
|
||||
}
|
||||
|
||||
class FabricNextGroup implements NextGroup {
|
||||
private NextObjective.Type type;
|
||||
private Collection<PortNumber> outputPorts;
|
||||
|
||||
public FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
|
||||
FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
|
||||
this.type = type;
|
||||
this.outputPorts = ImmutableList.copyOf(outputPorts);
|
||||
}
|
||||
|
||||
public NextObjective.Type type() {
|
||||
NextObjective.Type type() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public Collection<PortNumber> outputPorts() {
|
||||
Collection<PortNumber> outputPorts() {
|
||||
return outputPorts;
|
||||
}
|
||||
|
||||
@ -373,32 +333,13 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
}
|
||||
}
|
||||
|
||||
class InternalGroupListener implements GroupListener {
|
||||
@Override
|
||||
public void event(GroupEvent event) {
|
||||
GroupId groupId = event.subject().id();
|
||||
PendingGroupKey pendingGroupKey = new PendingGroupKey(groupId, event.type());
|
||||
PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
|
||||
if (GROUP_FAILED_TYPES.contains(event.type())) {
|
||||
pio.failed(pio.objective, ObjectiveError.GROUPINSTALLATIONFAILED);
|
||||
}
|
||||
pio.groupInstalled(pendingGroupKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRelevant(GroupEvent event) {
|
||||
PendingGroupKey pendingGroupKey = new PendingGroupKey(event.subject().id(), event.type());
|
||||
return pendingInstallObjectiveGroups.containsKey(pendingGroupKey);
|
||||
}
|
||||
}
|
||||
|
||||
class PendingInstallObjective {
|
||||
Objective objective;
|
||||
Collection<FlowId> flowIds;
|
||||
Collection<PendingGroupKey> pendingGroupKeys;
|
||||
Consumer<ObjectiveError> callback;
|
||||
|
||||
public PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
|
||||
PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
|
||||
Collection<PendingGroupKey> pendingGroupKeys,
|
||||
Consumer<ObjectiveError> callback) {
|
||||
this.objective = objective;
|
||||
@ -423,7 +364,7 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
|
||||
private void checkIfFinished() {
|
||||
if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) {
|
||||
pendingInstallObjectives.invalidate(objective);
|
||||
pendingInstallObjectives.remove(objective);
|
||||
callback.accept(null);
|
||||
}
|
||||
}
|
||||
@ -434,7 +375,7 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
pendingInstallObjectiveFlows.remove(pfk);
|
||||
});
|
||||
pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
|
||||
pendingInstallObjectives.invalidate(objective);
|
||||
pendingInstallObjectives.remove(objective);
|
||||
callback.accept(error);
|
||||
}
|
||||
|
||||
@ -509,11 +450,6 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
|
||||
private GroupId groupId;
|
||||
private GroupEvent.Type expectedEventType;
|
||||
|
||||
PendingGroupKey(GroupId groupId, GroupEvent.Type expectedEventType) {
|
||||
this.groupId = groupId;
|
||||
this.expectedEventType = expectedEventType;
|
||||
}
|
||||
|
||||
PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) {
|
||||
this.groupId = groupId;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user