CORD-1583 Bug fixes for dual ToRs

Two things:
  - In dual (paired) ToR scenarios it is possible to have the same outport
    in multiple buckets in a hash group, as long as they have different labels.
    When adding buckets this was taken into account. But when removing buckets,
    only outport was being checked. This bug fix ensures that labels are checked
    as well when removing buckets.
  - In dual ToR scenarios, getting the right set of hash buckets proved difficult
    with existing 'retryHash' mechanism. Repealed and replaced with a bucket corrector
    mechanism that periodically corrects the hash group buckets when the topology
    has been stable for the last 10 secs. Required the introduction of a VERIFY
    operation in Next Objectives. Also added a cli command to trigger this
    operation manually.

Change-Id: Ib0d2734060fadc6e7a4bd0d75f3409e194413a97
This commit is contained in:
Saurav Das 2017-08-03 18:30:35 -07:00 committed by Ray Milkey
parent 946721120b
commit ceccf24adc
14 changed files with 560 additions and 240 deletions

View File

@ -22,6 +22,8 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.joda.time.DateTime;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip6Address;
import org.onlab.packet.IpPrefix;
@ -61,6 +63,7 @@ public class DefaultRoutingHandler {
private static final int MAX_CONSTANT_RETRY_ATTEMPTS = 5;
private static final int RETRY_INTERVAL_MS = 250;
private static final int RETRY_INTERVAL_SCALE = 1;
private static final long STABLITY_THRESHOLD = 10; //secs
private static Logger log = LoggerFactory.getLogger(DefaultRoutingHandler.class);
private SegmentRoutingManager srManager;
@ -72,6 +75,7 @@ public class DefaultRoutingHandler {
private volatile Status populationStatus;
private ScheduledExecutorService executorService
= newScheduledThreadPool(1, groupedThreads("retryftr", "retry-%d", log));
private DateTime lastRoutingChange;
/**
* Represents the default routing population status.
@ -120,6 +124,35 @@ public class DefaultRoutingHandler {
return builder.build();
}
/**
* Acquires the lock used when making routing changes.
*/
public void acquireRoutingLock() {
statusLock.lock();
}
/**
* Releases the lock used when making routing changes.
*/
public void releaseRoutingLock() {
statusLock.unlock();
}
/**
* Determines if routing in the network has been stable in the last
* STABLITY_THRESHOLD seconds, by comparing the current time to the last
* routing change timestamp.
*
* @return true if stable
*/
public boolean isRoutingStable() {
long last = (long) (lastRoutingChange.getMillis() / 1000.0);
long now = (long) (DateTime.now().getMillis() / 1000.0);
log.debug("Routing stable since {}s", now - last);
return (now - last) > STABLITY_THRESHOLD;
}
//////////////////////////////////////
// Route path handling
//////////////////////////////////////
@ -136,6 +169,7 @@ public class DefaultRoutingHandler {
* startup or after a configuration event.
*/
public void populateAllRoutingRules() {
lastRoutingChange = DateTime.now();
statusLock.lock();
try {
if (populationStatus == Status.STARTED) {
@ -205,6 +239,7 @@ public class DefaultRoutingHandler {
* @param subnets subnets being added
*/ //XXX refactor
protected void populateSubnet(Set<ConnectPoint> cpts, Set<IpPrefix> subnets) {
lastRoutingChange = DateTime.now();
statusLock.lock();
try {
if (populationStatus == Status.STARTED) {
@ -328,7 +363,7 @@ public class DefaultRoutingHandler {
log.warn("Only one event can be handled for link status change .. aborting");
return;
}
lastRoutingChange = DateTime.now();
statusLock.lock();
try {

View File

@ -93,7 +93,7 @@ public class IcmpHandler extends SegmentRoutingNeighbourHandler {
treatment, ByteBuffer.wrap(payload.serialize()));
srManager.packetService.emit(packet);
} else {
log.debug("Send a MPLS packet as a ICMP response");
log.trace("Send a MPLS packet as a ICMP response");
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.setOutput(outport.port())
.build();

View File

@ -757,7 +757,7 @@ public class RoutingRulePopulator {
return false;
}
} else {
// Unconfigure port, use INTERNAL_VLAN
// Unconfigured port, use INTERNAL_VLAN
if (!processSinglePortFiltersInternal(deviceId, portnum, true, INTERNAL_VLAN, install)) {
return false;
}

View File

@ -522,6 +522,14 @@ public class SegmentRoutingManager implements SegmentRoutingService {
}
}
@Override
public void verifyGroups(DeviceId id) {
DefaultGroupHandler gh = groupHandlerMap.get(id);
if (gh != null) {
gh.triggerBucketCorrector();
}
}
/**
* Extracts the application ID from the manager.
*
@ -760,6 +768,15 @@ public class SegmentRoutingManager implements SegmentRoutingService {
return groupHandlerMap.get(devId);
}
/**
* Returns the default routing handler object.
*
* @return the default routing handler object
*/
public DefaultRoutingHandler getRoutingHandler() {
return defaultRoutingHandler;
}
/**
* Returns true if this controller instance has seen this link before. The
* link may not be currently up, but as long as the link had been seen before
@ -1249,7 +1266,10 @@ public class SegmentRoutingManager implements SegmentRoutingService {
seenLinks.keySet().removeIf(key -> key.src().deviceId().equals(device.id()) ||
key.dst().deviceId().equals(device.id()));
groupHandlerMap.remove(device.id());
DefaultGroupHandler gh = groupHandlerMap.remove(device.id());
if (gh != null) {
gh.shutdown();
}
defaultRoutingHandler.purgeEcmpGraph(device.id());
// Note that a switch going down is associated with all of its links
// going down as well, but it is treated as a single switch down event

View File

@ -134,4 +134,13 @@ public interface SegmentRoutingService {
* @return current contents of the destinationSetNextObjectiveStore
*/
ImmutableMap<DestinationSetNextObjectiveStoreKey, NextNeighbors> getDestinationSet();
/**
* Triggers the verification of all ECMP groups in the specified device.
* Adjusts the group buckets if verification finds that there are more or less
* buckets than what should be there.
*
* @param id the device identifier
*/
void verifyGroups(DeviceId id);
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.segmentrouting.cli;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.segmentrouting.SegmentRoutingService;
/**
* Triggers the verification of hashed group buckets in the specified device,
* and corrects the buckets if necessary. Outcome can be viewed in the 'groups'
* command.
*/
@Command(scope = "onos", name = "sr-verify-groups",
description = "Triggers the verification of hashed groups in the specified "
+ "device. Does not return any output; users can query the results "
+ "in the 'groups' command")
public class VerifyGroupsCommand extends AbstractShellCommand {
@Argument(index = 0, name = "uri", description = "Device ID",
required = true, multiValued = false)
String uri = null;
@Override
protected void execute() {
DeviceService deviceService = get(DeviceService.class);
SegmentRoutingService srService =
AbstractShellCommand.get(SegmentRoutingService.class);
if (uri != null) {
Device dev = deviceService.getDevice(DeviceId.deviceId(uri));
if (dev != null) {
srService.verifyGroups(dev.id());
}
}
}
}

View File

@ -40,6 +40,7 @@ import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.link.LinkService;
import org.onosproject.segmentrouting.DefaultRoutingHandler;
import org.onosproject.segmentrouting.SegmentRoutingManager;
import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
import org.onosproject.segmentrouting.config.DeviceProperties;
@ -76,6 +77,8 @@ import static org.slf4j.LoggerFactory.getLogger;
public class DefaultGroupHandler {
protected static final Logger log = getLogger(DefaultGroupHandler.class);
private static final long VERIFY_INTERVAL = 30; // secs
protected final DeviceId deviceId;
protected final ApplicationId appId;
protected final DeviceProperties deviceConfig;
@ -109,9 +112,8 @@ public class DefaultGroupHandler {
portNextObjStore = null;
private SegmentRoutingManager srManager;
private static final long RETRY_INTERVAL_SEC = 30;
private ScheduledExecutorService executorService
= newScheduledThreadPool(1, groupedThreads("retryhashbkts", "retry-%d", log));
= newScheduledThreadPool(1, groupedThreads("bktCorrector", "bktC-%d", log));
protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
.register(URI.class).register(HashSet.class)
@ -145,10 +147,20 @@ public class DefaultGroupHandler {
this.vlanNextObjStore = srManager.vlanNextObjStore();
this.portNextObjStore = srManager.portNextObjStore();
this.srManager = srManager;
executorService.scheduleWithFixedDelay(new BucketCorrector(), 10,
VERIFY_INTERVAL,
TimeUnit.SECONDS);
populateNeighborMaps();
}
/**
* Gracefully shuts down a groupHandler. Typically called when the handler is
* no longer needed.
*/
public void shutdown() {
executorService.shutdown();
}
/**
* Creates a group handler object.
*
@ -162,13 +174,13 @@ public class DefaultGroupHandler {
* @return default group handler type
*/
public static DefaultGroupHandler createGroupHandler(
DeviceId deviceId,
ApplicationId appId,
DeviceProperties config,
LinkService linkService,
FlowObjectiveService flowObjService,
SegmentRoutingManager srManager)
throws DeviceConfigNotFoundException {
DeviceId deviceId,
ApplicationId appId,
DeviceProperties config,
LinkService linkService,
FlowObjectiveService flowObjService,
SegmentRoutingManager srManager)
throws DeviceConfigNotFoundException {
return new DefaultGroupHandler(deviceId, appId, config,
linkService,
flowObjService,
@ -181,35 +193,35 @@ public class DefaultGroupHandler {
* @param link the infrastructure link
*/
public void portUpForLink(Link link) {
if (!link.src().deviceId().equals(deviceId)) {
log.warn("linkUp: deviceId{} doesn't match with link src {}",
deviceId, link.src().deviceId());
return;
}
if (!link.src().deviceId().equals(deviceId)) {
log.warn("linkUp: deviceId{} doesn't match with link src {}",
deviceId, link.src().deviceId());
return;
}
log.info("* portUpForLink: Device {} linkUp at local port {} to "
+ "neighbor {}", deviceId, link.src().port(), link.dst().deviceId());
// ensure local state is updated even if linkup is aborted later on
addNeighborAtPort(link.dst().deviceId(),
link.src().port());
}
log.info("* portUpForLink: Device {} linkUp at local port {} to "
+ "neighbor {}", deviceId, link.src().port(), link.dst().deviceId());
// ensure local state is updated even if linkup is aborted later on
addNeighborAtPort(link.dst().deviceId(),
link.src().port());
}
/**
* Updates local stores for port that has gone down.
*
* @param port port number that has gone down
*/
public void portDown(PortNumber port) {
if (portDeviceMap.get(port) == null) {
log.warn("portDown: unknown port");
return;
}
/**
* Updates local stores for port that has gone down.
*
* @param port port number that has gone down
*/
public void portDown(PortNumber port) {
if (portDeviceMap.get(port) == null) {
log.warn("portDown: unknown port");
return;
}
log.debug("Device {} portDown {} to neighbor {}", deviceId, port,
portDeviceMap.get(port));
devicePortMap.get(portDeviceMap.get(port)).remove(port);
portDeviceMap.remove(port);
}
log.debug("Device {} portDown {} to neighbor {}", deviceId, port,
portDeviceMap.get(port));
devicePortMap.get(portDeviceMap.get(port)).remove(port);
portDeviceMap.remove(port);
}
/**
* Checks all groups in the src-device of link for neighbor sets that include
@ -255,7 +267,7 @@ public class DefaultGroupHandler {
dstSet.forEach(dst -> {
int edgeLabel = dsKey.destinationSet().getEdgeLabel(dst);
addToHashedNextObjective(link.src().port(), dstMac,
edgeLabel, nextId, false);
edgeLabel, nextId);
});
if (firstTime) {
@ -269,8 +281,7 @@ public class DefaultGroupHandler {
}
dstSet.forEach(dst -> {
int edgeLabel = dsKey.destinationSet().getEdgeLabel(dst);
addToHashedNextObjective(p, dstMac, edgeLabel,
nextId, false);
addToHashedNextObjective(p, dstMac, edgeLabel, nextId);
});
}
}
@ -282,17 +293,6 @@ public class DefaultGroupHandler {
});
}
}
// It's possible that at the time of linkup, some hash-groups have
// not been created yet by the instance responsible for creating them, or
// due to the eventually-consistent nature of the nsNextObjStore it has
// not synced up with this instance yet. Thus we perform this check again
// after a delay (see CORD-1180). Duplicate additions to the same hash group
// are avoided by the driver.
if (!linkDown && firstTime) {
executorService.schedule(new RetryHashBkts(link, dstMac),
RETRY_INTERVAL_SEC, TimeUnit.SECONDS);
}
}
/**
@ -303,11 +303,10 @@ public class DefaultGroupHandler {
* @param dstMac destination mac address of next-hop
* @param edgeLabel the label to use in the bucket
* @param nextId id for next-objective to which the bucket will be added
* @param retry indicates if this method is being called on a retry attempt
* at adding a bucket to the group
*
*/
private void addToHashedNextObjective(PortNumber outport, MacAddress dstMac,
int edgeLabel, Integer nextId, boolean retry) {
int edgeLabel, Integer nextId) {
// Create the new bucket to be updated
TrafficTreatment.Builder tBuilder =
DefaultTrafficTreatment.builder();
@ -331,63 +330,60 @@ public class DefaultGroupHandler {
.addTreatment(tBuilder.build())
.withMeta(metabuilder.build())
.fromApp(appId);
log.debug("{} in device {}: Adding Bucket with port/label {}/{} to nextId {}",
(retry) ? "retry-addToHash" : "addToHash",
deviceId, outport, edgeLabel, nextId);
log.debug("addToHash in device {}: Adding Bucket with port/label {}/{} "
+ "to nextId {}", deviceId, outport, edgeLabel, nextId);
ObjectiveContext context = new DefaultObjectiveContext(
(objective) -> log.debug("{} addedTo NextObj {} on {}",
(retry) ? "retry-addToHash" : "addToHash",
(objective) -> log.debug("addToHash addedTo NextObj {} on {}",
nextId, deviceId),
(objective, error) ->
log.warn("{} failed to addTo NextObj {} on {}: {}",
(retry) ? "retry-addToHash" : "addToHash",
log.warn("addToHash failed to addTo NextObj {} on {}: {}",
nextId, deviceId, error));
NextObjective nextObjective = nextObjBuilder.addToExisting(context);
flowObjectiveService.next(deviceId, nextObjective);
}
/**
* Makes a call to the FlowObjective service to remove a single bucket from
* a hashed group.
*
* @param port port to remove from hash group
* @param dstMac destination mac address of next-hop
* @param edgeLabel the label to use in the bucket
* @param nextId id for next-objective from which the bucket will be removed
*/
private void removeFromHashedNextObjective(PortNumber port, MacAddress dstMac,
int edgeLabel, Integer nextId) {
// Create the bucket to be removed
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment
.builder();
tBuilder.setOutput(port)
.setEthDst(dstMac)
.setEthSrc(nodeMacAddr);
if (edgeLabel != DestinationSet.NO_EDGE_LABEL) {
tBuilder.pushMpls()
.copyTtlOut()
.setMpls(MplsLabel.mplsLabel(edgeLabel));
}
log.info("{} in device {}: Removing Bucket with Port {} to next object id {}",
"removeFromHash", deviceId, port, nextId);
NextObjective.Builder nextObjBuilder = DefaultNextObjective
.builder()
.withType(NextObjective.Type.HASHED) //same as original
.withId(nextId)
.fromApp(appId)
.addTreatment(tBuilder.build());
ObjectiveContext context = new DefaultObjectiveContext(
(objective) -> log.debug("port {} removedFrom NextObj {} on {}",
port, nextId, deviceId),
(objective, error) ->
log.warn("port {} failed to removeFrom NextObj {} on {}: {}",
port, nextId, deviceId, error));
NextObjective nextObjective = nextObjBuilder.
removeFromExisting(context);
* Makes a call to the FlowObjective service to remove a single bucket from
* a hashed group.
*
* @param port port to remove from hash group
* @param dstMac destination mac address of next-hop
* @param edgeLabel the label to use in the bucket
* @param nextId id for next-objective from which the bucket will be removed
*/
private void removeFromHashedNextObjective(PortNumber port, MacAddress dstMac,
int edgeLabel, Integer nextId) {
// Create the bucket to be removed
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment
.builder();
tBuilder.setOutput(port)
.setEthDst(dstMac)
.setEthSrc(nodeMacAddr);
if (edgeLabel != DestinationSet.NO_EDGE_LABEL) {
tBuilder.pushMpls()
.copyTtlOut()
.setMpls(MplsLabel.mplsLabel(edgeLabel));
}
log.info("{} in device {}: Removing Bucket with Port {} to next object id {}",
"removeFromHash", deviceId, port, nextId);
NextObjective.Builder nextObjBuilder = DefaultNextObjective
.builder()
.withType(NextObjective.Type.HASHED) //same as original
.withId(nextId)
.fromApp(appId)
.addTreatment(tBuilder.build());
ObjectiveContext context = new DefaultObjectiveContext(
(objective) -> log.debug("port {} removedFrom NextObj {} on {}",
port, nextId, deviceId),
(objective, error) ->
log.warn("port {} failed to removeFrom NextObj {} on {}: {}",
port, nextId, deviceId, error));
NextObjective nextObjective = nextObjBuilder.
removeFromExisting(context);
flowObjectiveService.next(deviceId, nextObjective);
}
flowObjectiveService.next(deviceId, nextObjective);
}
/**
* Checks all the hash-groups in the target-switch meant for the destination
@ -468,9 +464,7 @@ public class DefaultGroupHandler {
log.info("fixHashGroup in device {}: Adding Bucket "
+ "with Port {} to next object id {}",
deviceId, port, nextId);
addToHashedNextObjective(port, dstMac,
edgeLabel,
nextId, false);
addToHashedNextObjective(port, dstMac, edgeLabel, nextId);
}
// to update neighbor set with changes made
tempStore.put(dskey, Sets.union(currNeighbors, diff));
@ -500,8 +494,18 @@ public class DefaultGroupHandler {
return true;
}
public boolean updateNextHops(DestinationSet ds,
/**
* Updates the DestinationSetNextObjectiveStore with any per-destination nexthops
* that are not already in the store for the given DestinationSet. Note that
* this method does not remove existing next hops for the destinations in the
* DestinationSet.
*
* @param ds the DestinationSet for which the next hops need to be updated
* @param newDstNextHops a map of per-destination next hops to update the
* destinationSet with
* @return true if successful in updating all next hops
*/
private boolean updateNextHops(DestinationSet ds,
Map<DeviceId, Set<DeviceId>> newDstNextHops) {
DestinationSetNextObjectiveStoreKey key =
new DestinationSetNextObjectiveStoreKey(deviceId, ds);
@ -538,9 +542,20 @@ public class DefaultGroupHandler {
return success;
}
private boolean updateAllPortsToNextHop(Set<DeviceId> diff, int edgeLabel,
/**
* Adds or removes buckets for all ports to a set of neighbor devices.
*
* @param neighbors set of neighbor device ids
* @param edgeLabel MPLS label to use in buckets
* @param nextId the nextObjective to change
* @param revoke true if buckets need to be removed, false if they need to
* be added
* @return true if successful in adding or removing buckets for all ports
* to the neighbors
*/
private boolean updateAllPortsToNextHop(Set<DeviceId> neighbors, int edgeLabel,
int nextId, boolean revoke) {
for (DeviceId neighbor : diff) {
for (DeviceId neighbor : neighbors) {
MacAddress dstMac;
try {
dstMac = deviceConfig.getDeviceMac(neighbor);
@ -570,16 +585,13 @@ public class DefaultGroupHandler {
log.debug("fixHashGroup in device {}: Adding Bucket "
+ "with Port {} edgeLabel: {} to next object id {}",
deviceId, port, edgeLabel, nextId);
addToHashedNextObjective(port, dstMac,
edgeLabel,
nextId, false);
addToHashedNextObjective(port, dstMac, edgeLabel, nextId);
}
}
}
return true;
}
/**
* Adds or removes a port that has been configured with a vlan to a broadcast group
* for bridging. Should only be called by the master instance for this device.
@ -1101,46 +1113,115 @@ public class DefaultGroupHandler {
}
}*/ //XXX revisit
/**
* Triggers a one time bucket verification operation on all hash groups
* on this device.
*/
public void triggerBucketCorrector() {
BucketCorrector bc = new BucketCorrector();
bc.run();
}
/**
* RetryHashBkts is a one-time retry at populating all the buckets of a
* hash group based on the given link. Should only be called by the
* master instance of the src-device of the link.
*
*
*/
protected final class RetryHashBkts implements Runnable {
Link link;
MacAddress dstMac;
protected final class BucketCorrector implements Runnable {
Integer nextId;
private RetryHashBkts(Link link, MacAddress dstMac) {
this.link = link;
this.dstMac = dstMac;
BucketCorrector() {
this.nextId = null;
}
BucketCorrector(Integer nextId) {
this.nextId = nextId;
}
@Override
public void run() {
log.debug("RETRY Hash buckets for linkup: {}", link);
Set<DestinationSetNextObjectiveStoreKey> dsKeySet = dsNextObjStore.entrySet()
.stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId))
.filter(entry -> entry.getValue().containsNextHop(link.dst().deviceId()))
.map(entry -> entry.getKey())
.collect(Collectors.toSet());
log.debug("retry-link: dsNextObjStore contents for device {}: {}",
deviceId, dsKeySet);
for (DestinationSetNextObjectiveStoreKey dsKey : dsKeySet) {
NextNeighbors next = dsNextObjStore.get(dsKey);
if (next != null) {
Set<DeviceId> dstSet = next.getDstForNextHop(link.dst().deviceId());
dstSet.forEach(dst -> {
int edgeLabel = dsKey.destinationSet().getEdgeLabel(dst);
addToHashedNextObjective(link.src().port(), dstMac, edgeLabel,
next.nextId(), true);
});
}
if (!srManager.mastershipService.isLocalMaster(deviceId)) {
return;
}
DefaultRoutingHandler rh = srManager.getRoutingHandler();
if (rh == null) {
return;
}
if (!rh.isRoutingStable()) {
return;
}
rh.acquireRoutingLock();
try {
log.debug("running bucket corrector for dev: {}", deviceId);
Set<DestinationSetNextObjectiveStoreKey> dsKeySet = dsNextObjStore.entrySet()
.stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId))
.map(entry -> entry.getKey())
.collect(Collectors.toSet());
for (DestinationSetNextObjectiveStoreKey dsKey : dsKeySet) {
NextNeighbors next = dsNextObjStore.get(dsKey);
if (next == null) {
continue;
}
int nid = next.nextId();
if (nextId != null && nextId != nid) {
continue;
}
log.debug("bkt-corr: dsNextObjStore for device {}: {}",
deviceId, dsKey, next);
TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
metabuilder.matchVlanId(INTERNAL_VLAN);
NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
.withId(nid)
.withType(NextObjective.Type.HASHED)
.withMeta(metabuilder.build())
.fromApp(appId);
next.dstNextHops().forEach((dstDev, nextHops) -> {
int edgeLabel = dsKey.destinationSet().getEdgeLabel(dstDev);
nextHops.forEach(neighbor -> {
MacAddress neighborMac;
try {
neighborMac = deviceConfig.getDeviceMac(neighbor);
} catch (DeviceConfigNotFoundException e) {
log.warn(e.getMessage() + " Aborting neighbor"
+ neighbor);
return;
}
devicePortMap.get(neighbor).forEach(port -> {
log.debug("verify in device {} nextId {}: bucket with"
+ " port/label {}/{} to dst {} via {}",
deviceId, nid, port, edgeLabel,
dstDev, neighbor);
nextObjBuilder.addTreatment(treatmentBuilder(port,
neighborMac, edgeLabel));
});
});
});
NextObjective nextObjective = nextObjBuilder.verify();
flowObjectiveService.next(deviceId, nextObjective);
}
} finally {
rh.releaseRoutingLock();
}
}
TrafficTreatment treatmentBuilder(PortNumber outport, MacAddress dstMac,
int edgeLabel) {
TrafficTreatment.Builder tBuilder =
DefaultTrafficTreatment.builder();
tBuilder.setOutput(outport)
.setEthDst(dstMac)
.setEthSrc(nodeMacAddr);
if (edgeLabel != DestinationSet.NO_EDGE_LABEL) {
tBuilder.pushMpls()
.copyTtlOut()
.setMpls(MplsLabel.mplsLabel(edgeLabel));
}
return tBuilder.build();
}
}
}
}

View File

@ -48,7 +48,16 @@
<command>
<action class="org.onosproject.segmentrouting.cli.NextHopCommand"/>
</command>
<command>
<action class="org.onosproject.segmentrouting.cli.VerifyGroupsCommand"/>
<completers>
<ref component-id="deviceIdCompleter"/>
</completers>
</command>
</command-bundle>
<bean id="deviceIdCompleter" class="org.onosproject.cli.net.DeviceIdCompleter"/>
</blueprint>

View File

@ -247,25 +247,12 @@ public final class DefaultNextObjective implements NextObjective {
@Override
public NextObjective add() {
treatments = listBuilder.build();
op = Operation.ADD;
checkNotNull(appId, "Must supply an application id");
checkNotNull(id, "id cannot be null");
checkNotNull(type, "The type cannot be null");
checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
return new DefaultNextObjective(this);
return add(null);
}
@Override
public NextObjective remove() {
treatments = listBuilder.build();
op = Operation.REMOVE;
checkNotNull(appId, "Must supply an application id");
checkNotNull(id, "id cannot be null");
checkNotNull(type, "The type cannot be null");
return new DefaultNextObjective(this);
return remove(null);
}
@Override
@ -295,25 +282,12 @@ public final class DefaultNextObjective implements NextObjective {
@Override
public NextObjective addToExisting() {
treatments = listBuilder.build();
op = Operation.ADD_TO_EXISTING;
checkNotNull(appId, "Must supply an application id");
checkNotNull(id, "id cannot be null");
checkNotNull(type, "The type cannot be null");
checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
return new DefaultNextObjective(this);
return addToExisting(null);
}
@Override
public NextObjective removeFromExisting() {
treatments = listBuilder.build();
op = Operation.REMOVE_FROM_EXISTING;
checkNotNull(appId, "Must supply an application id");
checkNotNull(id, "id cannot be null");
checkNotNull(type, "The type cannot be null");
return new DefaultNextObjective(this);
return removeFromExisting(null);
}
@Override
@ -341,6 +315,22 @@ public final class DefaultNextObjective implements NextObjective {
return new DefaultNextObjective(this);
}
@Override
public NextObjective verify() {
return verify(null);
}
@Override
public NextObjective verify(ObjectiveContext context) {
treatments = listBuilder.build();
op = Operation.VERIFY;
this.context = context;
checkNotNull(appId, "Must supply an application id");
checkNotNull(id, "id cannot be null");
checkNotNull(type, "The type cannot be null");
return new DefaultNextObjective(this);
}
}
}

View File

@ -219,6 +219,22 @@ public interface NextObjective extends Objective {
*/
NextObjective removeFromExisting(ObjectiveContext context);
/**
* Builds the next objective that needs to be verified.
*
* @return a next objective with {@link Operation} VERIFY
*/
NextObjective verify();
/**
* Builds the next objective that needs to be verified. The context will
* be used to notify the calling application.
*
* @param context an objective context
* @return a next objective with {@link Operation} VERIFY
*/
NextObjective verify(ObjectiveContext context);
}
}

View File

@ -60,7 +60,20 @@ public interface Objective {
* Remove from an existing Next Objective. Should not be used for any
* other objective.
*/
REMOVE_FROM_EXISTING
REMOVE_FROM_EXISTING,
/**
* Verifies that an existing Next Objective's collection of treatments
* are correctly represented by the underlying implementation of the objective.
* Corrective action is taken if discrepancies are found during verification.
* For example, if the next objective defines 3 sets of treatments, which
* are meant to be implemented as 3 buckets in a group, but verification
* finds less or more buckets, then the appropriate buckets are added or
* removed to match the objective.
*
* Should not be used for any other objective.
*/
VERIFY
}
/**

View File

@ -66,6 +66,7 @@ import org.slf4j.Logger;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
@ -1031,9 +1032,10 @@ public class Ofdpa2GroupHandler {
if (label == -1) {
duplicateBuckets.add(trafficTreatment);
} else {
boolean exists = existingPortAndLabel(allActiveKeys, groupService,
deviceId, portNumber, label);
if (exists) {
List<Integer> existing = existingPortAndLabel(allActiveKeys,
groupService, deviceId,
portNumber, label);
if (!existing.isEmpty()) {
duplicateBuckets.add(trafficTreatment);
} else {
nonDuplicateBuckets.add(trafficTreatment);
@ -1316,12 +1318,14 @@ public class Ofdpa2GroupHandler {
}
/**
* Removes the bucket in the top level group of a possible group-chain. Does
* Removes buckets in the top level group of a possible group-chain. Does
* not remove the groups in the group-chain pointed to by this bucket, as they
* may be in use (referenced by other groups) elsewhere.
*
* @param nextObjective the bucket information for a next group
* @param next the representation of the existing group-chain for this next objective
* @param nextObjective a next objective that contains information for the
* buckets to be removed from the group
* @param next the representation of the existing group-chains for this next
* objective, from which the top-level buckets to remove are determined
*/
protected void removeBucketFromGroup(NextObjective nextObjective, NextGroup next) {
if (nextObjective.type() != NextObjective.Type.HASHED &&
@ -1331,55 +1335,47 @@ public class Ofdpa2GroupHandler {
fail(nextObjective, ObjectiveError.UNSUPPORTED);
return;
}
Set<PortNumber> portsToRemove = Sets.newHashSet();
Collection<TrafficTreatment> treatments = nextObjective.next();
for (TrafficTreatment treatment : treatments) {
// find the bucket to remove by noting the outport, and figuring out the
// top-level group in the group-chain that indirectly references the port
PortNumber portToRemove = readOutPortFromTreatment(treatment);
if (portToRemove == null) {
log.warn("treatment {} of next objective {} has no outport.. cannot remove bucket"
+ "from group in dev: {}", treatment, nextObjective.id(), deviceId);
} else {
portsToRemove.add(portToRemove);
}
}
if (portsToRemove.isEmpty()) {
log.warn("next objective {} has no outport.. cannot remove bucket"
+ "from group in dev: {}", nextObjective.id(), deviceId);
fail(nextObjective, ObjectiveError.BADPARAMS);
}
List<Deque<GroupKey>> allActiveKeys = appKryo.deserialize(next.data());
List<Deque<GroupKey>> chainsToRemove = Lists.newArrayList();
for (Deque<GroupKey> gkeys : allActiveKeys) {
// last group in group chain should have a single bucket pointing to port
GroupKey groupWithPort = gkeys.peekLast();
Group group = groupService.getGroup(deviceId, groupWithPort);
if (group == null) {
log.warn("Inconsistent group chain found when removing bucket"
+ "for next:{} in dev:{}", nextObjective.id(), deviceId);
List<Integer> indicesToRemove = Lists.newArrayList();
for (TrafficTreatment treatment : nextObjective.next()) {
// find the top-level bucket in the group-chain by matching the
// outport and label from different groups in the chain
PortNumber portToRemove = readOutPortFromTreatment(treatment);
int labelToRemove = readLabelFromTreatment(treatment);
if (portToRemove == null) {
log.warn("treatment {} of next objective {} has no outport.. "
+ "cannot remove bucket from group in dev: {}", treatment,
nextObjective.id(), deviceId);
continue;
}
if (group.buckets().buckets().isEmpty()) {
log.warn("Can't get output port information from group {} " +
"because there is no bucket in the group.",
group.id().toString());
continue;
}
PortNumber pout = readOutPortFromTreatment(
group.buckets().buckets().get(0).treatment());
if (portsToRemove.contains(pout)) {
chainsToRemove.add(gkeys);
}
List<Integer> existing = existingPortAndLabel(allActiveKeys,
groupService, deviceId,
portToRemove, labelToRemove);
indicesToRemove.addAll(existing);
}
List<Deque<GroupKey>> chainsToRemove = Lists.newArrayList();
indicesToRemove.forEach(index -> chainsToRemove
.add(allActiveKeys.get(index)));
if (chainsToRemove.isEmpty()) {
log.warn("Could not find appropriate group-chain for removing bucket"
+ " for next id {} in dev:{}", nextObjective.id(), deviceId);
fail(nextObjective, ObjectiveError.BADPARAMS);
return;
}
removeBucket(chainsToRemove, nextObjective);
}
/**
* Removes top-level buckets from a group that represents the given next objective.
*
* @param chainsToRemove a list of group bucket chains to remove
* @param nextObjective the next objective that contains information for the
* buckets to be removed from the group
*/
protected void removeBucket(List<Deque<GroupKey>> chainsToRemove,
NextObjective nextObjective) {
List<GroupBucket> bucketsToRemove = Lists.newArrayList();
//first group key is the one we want to modify
GroupKey modGroupKey = chainsToRemove.get(0).peekFirst();
@ -1387,15 +1383,15 @@ public class Ofdpa2GroupHandler {
for (Deque<GroupKey> foundChain : chainsToRemove) {
//second group key is the one we wish to remove the reference to
if (foundChain.size() < 2) {
// additional check to make sure second group key exist in
// additional check to make sure second group key exists in
// the chain.
log.warn("Can't find second group key from chain {}",
foundChain);
continue;
}
GroupKey pointedGroupKey = foundChain.stream().collect(Collectors.toList()).get(1);
GroupKey pointedGroupKey = foundChain.stream()
.collect(Collectors.toList()).get(1);
Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
if (pointedGroup == null) {
continue;
}
@ -1412,7 +1408,6 @@ public class Ofdpa2GroupHandler {
.group(pointedGroup.id())
.build());
}
bucketsToRemove.add(bucket);
}
@ -1437,15 +1432,20 @@ public class Ofdpa2GroupHandler {
groupService.removeBucketsFromGroup(deviceId, modGroupKey,
removeBuckets, modGroupKey,
nextObjective.appId());
// update store - synchronize access
// update store - synchronize access as there may be multiple threads
// trying to remove buckets from the same group, each with its own
// potentially stale copy of allActiveKeys
synchronized (flowObjectiveStore) {
// get fresh copy of what the store holds
next = flowObjectiveStore.getNextGroup(nextObjective.id());
allActiveKeys = appKryo.deserialize(next.data());
// get a fresh copy of what the store holds
NextGroup next = flowObjectiveStore.getNextGroup(nextObjective.id());
List<Deque<GroupKey>> allActiveKeys = appKryo.deserialize(next.data());
// Note that since we got a new object, and ArrayDeque does not implement
// Object.equals(), we have to check the deque last elems one by one
allActiveKeys.removeIf(active -> chainsToRemove.stream().anyMatch(remove ->
remove.peekLast().equals(active.peekLast())));
// Object.equals(), we have to check the deque elems one by one
allActiveKeys
.removeIf(active ->
chainsToRemove.stream().anyMatch(remove ->
Arrays.equals(remove.toArray(new GroupKey[0]),
active.toArray(new GroupKey[0]))));
// If no buckets in the group, then retain an entry for the
// top level group which still exists.
if (allActiveKeys.isEmpty()) {
@ -1454,7 +1454,8 @@ public class Ofdpa2GroupHandler {
allActiveKeys.add(top);
}
flowObjectiveStore.putNextGroup(nextObjective.id(),
new OfdpaNextGroup(allActiveKeys, nextObjective));
new OfdpaNextGroup(allActiveKeys,
nextObjective));
}
}
@ -1477,6 +1478,79 @@ public class Ofdpa2GroupHandler {
flowObjectiveStore.removeNextGroup(nextObjective.id());
}
/**
* Checks existing buckets in {@link NextGroup} to verify if they match
* the buckets in the given {@link NextObjective}. Adds or removes buckets
* to ensure that the buckets match up.
*
* @param nextObjective the next objective to verify
* @param next the representation of the existing group which has to be
* modified to match the given next objective
*/
protected void verifyGroup(NextObjective nextObjective, NextGroup next) {
if (nextObjective.type() != NextObjective.Type.HASHED) {
log.warn("verification not supported for {} group", nextObjective.type());
fail(nextObjective, ObjectiveError.UNSUPPORTED);
return;
}
log.debug("Call to verify device:{} nextId:{}", deviceId, nextObjective.id());
List<Deque<GroupKey>> allActiveKeys = appKryo.deserialize(next.data());
List<TrafficTreatment> bucketsToCreate = Lists.newArrayList();
List<Integer> indicesToRemove = Lists.newArrayList();
// XXX verify empty group
for (TrafficTreatment bkt : nextObjective.next()) {
PortNumber portNumber = readOutPortFromTreatment(bkt);
int label = readLabelFromTreatment(bkt);
if (portNumber == null) {
log.warn("treatment {} of next objective {} has no outport.. "
+ "cannot remove bucket from group in dev: {}", bkt,
nextObjective.id(), deviceId);
fail(nextObjective, ObjectiveError.BADPARAMS);
return;
}
List<Integer> existing = existingPortAndLabel(allActiveKeys,
groupService, deviceId,
portNumber, label);
if (existing.isEmpty()) {
// if it doesn't exist, mark this bucket for creation
bucketsToCreate.add(bkt);
}
if (existing.size() > 1) {
// if it exists but there are duplicates, mark the others for removal
existing.remove(0);
indicesToRemove.addAll(existing);
}
}
if (!bucketsToCreate.isEmpty()) {
log.info("creating {} buckets as part of nextId: {} verification",
bucketsToCreate.size(), nextObjective.id());
//create a nextObjective only with these buckets
NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
.withId(nextObjective.id())
.withType(NextObjective.Type.HASHED)
.withMeta(nextObjective.meta())
.fromApp(nextObjective.appId());
bucketsToCreate.forEach(bucket -> nextObjBuilder.addTreatment(bucket));
addBucketToHashGroup(nextObjBuilder.addToExisting(), allActiveKeys);
}
if (!indicesToRemove.isEmpty()) {
log.info("removing {} buckets as part of nextId: {} verification",
indicesToRemove.size(), nextObjective.id());
List<Deque<GroupKey>> chainsToRemove = Lists.newArrayList();
indicesToRemove.forEach(index -> chainsToRemove
.add(allActiveKeys.get(index)));
removeBucket(chainsToRemove, nextObjective);
}
pass(nextObjective);
}
//////////////////////////////////////
// Helper methods and classes
//////////////////////////////////////
protected void updatePendingNextObjective(GroupKey groupKey, OfdpaNextGroup nextGrp) {
pendingAddNextObjectives.asMap().compute(groupKey, (k, val) -> {
if (val == null) {

View File

@ -327,6 +327,16 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline
nextObjective.id(), deviceId);
groupHandler.removeBucketFromGroup(nextObjective, nextGroup);
break;
case VERIFY:
if (nextGroup == null) {
log.warn("Cannot verify next {} that does not exist in device {}",
nextObjective.id(), deviceId);
return;
}
log.debug("Processing NextObjective id {} in dev {} - verify",
nextObjective.id(), deviceId);
groupHandler.verifyGroup(nextObjective, nextGroup);
break;
default:
log.warn("Unsupported operation {}", nextObjective.op());
}

View File

@ -41,6 +41,7 @@ import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
@ -213,22 +214,28 @@ public final class OfdpaGroupHandlerUtility {
}
/**
* Returns true if the group represented by allActiveKeys contains a bucket
* (group-chain) with actions that match the given outport and label.
* Returns a list of all indices in the allActiveKeys list (that represents
* a group) if the list element (a bucket or group-chain) has treatments
* that match the given outport and label.
*
* @param allActiveKeys the representation of the group
* @param groupService groups service for querying group information
* @param deviceId the device id for the device that contains the group
* @param portToMatch the port to match in the group buckets
* @param labelToMatch the MPLS label-id to match in the group buckets
* @return true if a bucket (group-chain) is found with actions that match
* the given portToMatch and labelToMatch
* @return a list of indexes in the allActiveKeys list where the list element
* has treatments that match the given portToMatch and labelToMatch.
* Could be empty if no list elements were found to match the given
* port and label.
*/
public static boolean existingPortAndLabel(List<Deque<GroupKey>> allActiveKeys,
public static List<Integer> existingPortAndLabel(
List<Deque<GroupKey>> allActiveKeys,
GroupService groupService,
DeviceId deviceId,
PortNumber portToMatch,
int labelToMatch) {
List<Integer> indices = new ArrayList<>();
int index = 0;
for (Deque<GroupKey> keyChain : allActiveKeys) {
GroupKey ifaceGroupKey = keyChain.peekLast();
Group ifaceGroup = groupService.getGroup(deviceId, ifaceGroupKey);
@ -245,14 +252,15 @@ public final class OfdpaGroupHandlerUtility {
secondGroup.buckets().buckets()
.iterator().next().treatment());
if (label == labelToMatch) {
return true;
indices.add(index);
}
}
}
}
index++;
}
return false;
return indices;
}
/**