Fix timeout problem of fabric pipeliner

Including a workaround for ONOS-7785

Change-Id: I867161f5edf63e82c42a731a4b107ea326d4675c
This commit is contained in:
Yi Tseng 2018-08-19 03:09:54 +08:00 committed by Charles Chan
parent 8a715f84c5
commit fe13f3e634
3 changed files with 132 additions and 55 deletions

View File

@ -472,6 +472,11 @@ public class SegmentRoutingManager implements SegmentRoutingService {
"staleLinkAge", "15000");
compCfgService.preSetProperty("org.onosproject.net.host.impl.HostManager",
"allowDuplicateIps", "false");
// For P4 switches
compCfgService.preSetProperty("org.onosproject.net.flow.impl.FlowRuleManager",
"fallbackFlowPollFrequency", "5");
compCfgService.preSetProperty("org.onosproject.net.group.impl.GroupManager",
"fallbackGroupPollFrequency", "5");
compCfgService.registerProperties(getClass());
modified(context);

View File

@ -19,12 +19,12 @@ package org.onosproject.drivers.p4runtime;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Striped;
import org.apache.commons.lang3.tuple.Pair;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeGroupMirror;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMulticastGroupMirror;
import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
import org.onosproject.net.DeviceId;
import org.onosproject.net.group.DefaultGroup;
import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupOperation;
@ -116,17 +116,23 @@ public class P4RuntimeGroupProgrammable
if (!setupBehaviour()) {
return;
}
groupOps.operations().stream()
// Get group type and operation type
.map(op -> Pair.of(groupStore.getGroup(deviceId, op.groupId()),
op.opType()))
.forEach(pair -> {
if (pair.getLeft().type().equals(GroupDescription.Type.ALL)) {
processMcGroupOp(deviceId, pair.getLeft(), pair.getRight());
} else {
processGroupOp(deviceId, pair.getLeft(), pair.getRight());
}
});
groupOps.operations().forEach(op -> {
// ONOS-7785 We need app cookie (action profile id) from the group
Group groupOnStore = groupStore.getGroup(deviceId, op.groupId());
GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
op.groupType(),
op.buckets(),
groupOnStore.appCookie(),
op.groupId().id(),
groupOnStore.appId());
DefaultGroup groupToApply = new DefaultGroup(op.groupId(), groupDesc);
if (op.groupType().equals(GroupDescription.Type.ALL)) {
processMcGroupOp(deviceId, groupToApply, op.opType());
} else {
processGroupOp(deviceId, groupToApply, op.opType());
}
});
}
@Override

View File

@ -16,6 +16,8 @@
package org.onosproject.pipelines.fabric.pipeliner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
@ -24,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
@ -56,7 +59,10 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -81,6 +87,7 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
// TODO: make this configurable
private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
private static final int NUM_CALLBACK_THREAD = 2;
protected DeviceId deviceId;
protected FlowRuleService flowRuleService;
@ -92,7 +99,7 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
protected FabricNextPipeliner pipelinerNext;
private Map<FlowId, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
private Map<GroupId, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
.expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
.removalListener((RemovalListener<Objective, PendingInstallObjective>) removalNotification -> {
@ -103,6 +110,8 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
}
})
.build();
private static ExecutorService flowObjCallbackExecutor =
Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log));
@Override
@ -126,8 +135,8 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
return;
}
applyTranslationResult(filterObjective, result, success -> {
if (success) {
applyTranslationResult(filterObjective, result, error -> {
if (error == null) {
success(filterObjective);
} else {
fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
@ -143,8 +152,8 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
return;
}
applyTranslationResult(forwardObjective, result, success -> {
if (success) {
applyTranslationResult(forwardObjective, result, error -> {
if (error == null) {
success(forwardObjective);
} else {
fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
@ -168,25 +177,22 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
return;
}
applyTranslationResult(nextObjective, result, success -> {
if (!success) {
fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED);
applyTranslationResult(nextObjective, result, error -> {
if (error != null) {
fail(nextObjective, error);
return;
}
// Success, put next group to objective store
List<PortNumber> portNumbers = Lists.newArrayList();
nextObjective.next().forEach(treatment -> {
Instructions.OutputInstruction outputInst = treatment.allInstructions()
treatment.allInstructions()
.stream()
.filter(inst -> inst.type() == Instruction.Type.OUTPUT)
.map(inst -> (Instructions.OutputInstruction) inst)
.findFirst()
.orElse(null);
if (outputInst != null) {
portNumbers.add(outputInst.port());
}
.map(Instructions.OutputInstruction::port)
.ifPresent(portNumbers::add);
});
FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
portNumbers);
@ -208,23 +214,25 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
private void applyTranslationResult(Objective objective,
PipelinerTranslationResult result,
Consumer<Boolean> callback) {
Consumer<ObjectiveError> callback) {
Collection<GroupDescription> groups = result.groups();
Collection<FlowRule> flowRules = result.flowRules();
Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
Set<GroupId> groupIds = groups.stream().map(GroupDescription::givenGroupId)
.map(GroupId::new).collect(Collectors.toSet());
Set<PendingGroupKey> pendingGroupKeys = groups.stream().map(GroupDescription::givenGroupId)
.map(GroupId::new)
.map(gid -> new PendingGroupKey(gid, objective.op()))
.collect(Collectors.toSet());
PendingInstallObjective pio =
new PendingInstallObjective(objective, flowIds, groupIds, callback);
new PendingInstallObjective(objective, flowIds, pendingGroupKeys, callback);
flowIds.forEach(flowId -> {
pendingInstallObjectiveFlows.put(flowId, pio);
});
groupIds.forEach(groupId -> {
pendingInstallObjectiveGroups.put(groupId, pio);
pendingGroupKeys.forEach(pendingGroupKey -> {
pendingInstallObjectiveGroups.put(pendingGroupKey, pio);
});
pendingInstallObjectives.put(objective, pio);
@ -305,11 +313,16 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
}
static void fail(Objective objective, ObjectiveError error) {
objective.context().ifPresent(ctx -> ctx.onError(objective, error));
CompletableFuture.runAsync(() -> {
objective.context().ifPresent(ctx -> ctx.onError(objective, error));
}, flowObjCallbackExecutor);
}
static void success(Objective objective) {
objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
CompletableFuture.runAsync(() -> {
objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
}, flowObjCallbackExecutor);
}
static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules,
@ -317,16 +330,13 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
switch (objective.op()) {
case ADD:
case ADD_TO_EXISTING: // For egress VLAN
flowRules.forEach(ops::add);
break;
case REMOVE:
case REMOVE_FROM_EXISTING: // For egress VLAN
flowRules.forEach(ops::remove);
break;
case ADD_TO_EXISTING:
case REMOVE_FROM_EXISTING:
// Next objective may use ADD_TO_EXIST or REMOVE_FROM_EXIST op
// No need to update FlowRuls for vlan_meta table.
return null;
default:
log.warn("Unsupported op {} for {}", objective.op(), objective);
fail(objective, ObjectiveError.BADPARAMS);
@ -362,33 +372,33 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
@Override
public void event(GroupEvent event) {
GroupId groupId = event.subject().id();
PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(groupId);
if (pio == null) {
return;
}
PendingGroupKey pendingGroupKey = new PendingGroupKey(groupId, event.type());
PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
if (GROUP_FAILED_TYPES.contains(event.type())) {
pio.failed(ObjectiveError.GROUPINSTALLATIONFAILED);
}
pio.groupInstalled(groupId);
pio.groupInstalled(pendingGroupKey);
}
@Override
public boolean isRelevant(GroupEvent event) {
return pendingInstallObjectiveGroups.containsKey(event.subject().id());
PendingGroupKey pendingGroupKey = new PendingGroupKey(event.subject().id(), event.type());
return pendingInstallObjectiveGroups.containsKey(pendingGroupKey);
}
}
class PendingInstallObjective {
Objective objective;
Collection<FlowId> flowIds;
Collection<GroupId> groupIds;
Consumer<Boolean> callback;
Collection<PendingGroupKey> pendingGroupKeys;
Consumer<ObjectiveError> callback;
public PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
Collection<GroupId> groupIds, Consumer<Boolean> callback) {
Collection<PendingGroupKey> pendingGroupKeys,
Consumer<ObjectiveError> callback) {
this.objective = objective;
this.flowIds = flowIds;
this.groupIds = groupIds;
this.pendingGroupKeys = pendingGroupKeys;
this.callback = callback;
}
@ -397,23 +407,79 @@ public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeli
checkIfFinished();
}
void groupInstalled(GroupId groupId) {
groupIds.remove(groupId);
void groupInstalled(PendingGroupKey pendingGroupKey) {
pendingGroupKeys.remove(pendingGroupKey);
checkIfFinished();
}
private void checkIfFinished() {
if (flowIds.isEmpty() && groupIds.isEmpty()) {
if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) {
pendingInstallObjectives.invalidate(objective);
callback.accept(true);
callback.accept(null);
}
}
void failed(ObjectiveError error) {
flowIds.forEach(pendingInstallObjectiveFlows::remove);
groupIds.forEach(pendingInstallObjectiveGroups::remove);
pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
pendingInstallObjectives.invalidate(objective);
fail(objective, error);
callback.accept(error);
}
}
class PendingGroupKey {
private GroupId groupId;
private GroupEvent.Type expectedEventType;
PendingGroupKey(GroupId groupId, GroupEvent.Type expectedEventType) {
this.groupId = groupId;
this.expectedEventType = expectedEventType;
}
PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) {
this.groupId = groupId;
switch (objOp) {
case ADD:
expectedEventType = GroupEvent.Type.GROUP_ADDED;
break;
case REMOVE:
expectedEventType = GroupEvent.Type.GROUP_REMOVED;
break;
case MODIFY:
case ADD_TO_EXISTING:
case REMOVE_FROM_EXISTING:
expectedEventType = GroupEvent.Type.GROUP_UPDATED;
break;
default:
expectedEventType = null;
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PendingGroupKey pendingGroupKey = (PendingGroupKey) o;
return Objects.equal(groupId, pendingGroupKey.groupId) &&
expectedEventType == pendingGroupKey.expectedEventType;
}
@Override
public int hashCode() {
return Objects.hashCode(groupId, expectedEventType);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("groupId", groupId)
.add("expectedEventType", expectedEventType)
.toString();
}
}
}