Fix missing P4Runtime multicast/clone groups after device reboot

Change-Id: Ica1cef51aa4bd1c83486d92fcc59f32da9567acd
(cherry picked from commit 14301075549fba5568d8b6b7e32602b5452577d2)
This commit is contained in:
Carmelo Cascone 2019-04-28 12:45:56 -07:00 committed by Ray Milkey
parent 69b2fed517
commit d6191ec4aa
2 changed files with 66 additions and 1 deletions

View File

@ -19,11 +19,15 @@ package org.onosproject.drivers.p4runtime.mirror;
import com.google.common.annotations.Beta;
import com.google.common.collect.Maps;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SharedExecutors;
import org.onosproject.net.Annotations;
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.runtime.PiEntity;
import org.onosproject.net.pi.runtime.PiEntityType;
import org.onosproject.net.pi.runtime.PiHandle;
import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateRequest;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
@ -39,10 +43,12 @@ import org.slf4j.Logger;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
import static org.slf4j.LoggerFactory.getLogger;
/**
@ -62,13 +68,28 @@ public abstract class AbstractDistributedP4RuntimeMirror
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected PiPipeconfWatchdogService pipeconfWatchdogService;
private EventuallyConsistentMap<PiHandle, TimedEntry<E>> mirrorMap;
private EventuallyConsistentMap<PiHandle, Annotations> annotationsMap;
private final PiEntityType entityType;
private final boolean flushOnPipelineUnknown;
private final PiPipeconfWatchdogListener pipeconfListener =
new InternalPipeconfWatchdogListener();
AbstractDistributedP4RuntimeMirror(PiEntityType entityType) {
this.entityType = entityType;
this.flushOnPipelineUnknown = false;
}
AbstractDistributedP4RuntimeMirror(PiEntityType entityType,
boolean flushOnPipelineUnknown) {
this.entityType = entityType;
this.flushOnPipelineUnknown = flushOnPipelineUnknown;
}
@Activate
@ -94,11 +115,13 @@ public abstract class AbstractDistributedP4RuntimeMirror
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
pipeconfWatchdogService.addListener(pipeconfListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
pipeconfWatchdogService.removeListener(pipeconfListener);
mirrorMap.destroy();
mirrorMap = null;
log.info("Stopped");
@ -123,6 +146,15 @@ public abstract class AbstractDistributedP4RuntimeMirror
public void put(H handle, E entry) {
checkNotNull(handle);
checkNotNull(entry);
final PiPipeconfWatchdogService.PipelineStatus status =
pipeconfWatchdogService.getStatus(handle.deviceId());
if (flushOnPipelineUnknown && !status.equals(READY)) {
// Keep mirror empty if pipeline status is UNKNOWN.
log.info("Ignoring {} mirror update because pipeline " +
"status of {} is {}: {}",
entityType, handle.deviceId(), status, entry);
return;
}
final long now = new WallClockTimestamp().unixTimestamp();
final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
mirrorMap.put(handle, timedEntry);
@ -190,6 +222,12 @@ public abstract class AbstractDistributedP4RuntimeMirror
}
}
private Set<PiHandle> getHandlesForDevice(DeviceId deviceId) {
return mirrorMap.keySet().stream()
.filter(h -> h.deviceId().equals(deviceId))
.collect(Collectors.toSet());
}
private Map<PiHandle, E> deviceHandleMap(DeviceId deviceId) {
final Map<PiHandle, E> deviceMap = Maps.newHashMap();
mirrorMap.entrySet().stream()
@ -198,6 +236,14 @@ public abstract class AbstractDistributedP4RuntimeMirror
return deviceMap;
}
private void removeAll(DeviceId deviceId) {
checkNotNull(deviceId);
@SuppressWarnings("unchecked")
Collection<H> handles = (Collection<H>) getHandlesForDevice(deviceId);
handles.forEach(this::remove);
}
@Override
public void applyWriteRequest(WriteRequest request) {
// Optimistically assume all requests will be successful.
@ -228,4 +274,20 @@ public abstract class AbstractDistributedP4RuntimeMirror
}
});
}
public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
@Override
public void event(PiPipeconfWatchdogEvent event) {
log.debug("Flushing mirror for {}, pipeline status is {}",
event.subject(), event.type());
SharedExecutors.getPoolThreadExecutor().execute(
() -> removeAll(event.subject()));
}
@Override
public boolean isRelevant(PiPipeconfWatchdogEvent event) {
return flushOnPipelineUnknown &&
event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
}
}
}

View File

@ -30,6 +30,9 @@ public final class DistributedP4RuntimePreEntryMirror
implements P4RuntimePreEntryMirror {
public DistributedP4RuntimePreEntryMirror() {
super(PiEntityType.PRE_ENTRY);
// PI does not support reading PRE entries. To avoid inconsistencies,
// flush mirror on device disconnection and other events which
// invalidate pipeline status.
super(PiEntityType.PRE_ENTRY, true);
}
}