diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java index f9557068df..4bc9536dd9 100644 --- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java +++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java @@ -19,11 +19,15 @@ package org.onosproject.drivers.p4runtime.mirror; import com.google.common.annotations.Beta; import com.google.common.collect.Maps; import org.onlab.util.KryoNamespace; +import org.onlab.util.SharedExecutors; import org.onosproject.net.Annotations; import org.onosproject.net.DeviceId; import org.onosproject.net.pi.runtime.PiEntity; import org.onosproject.net.pi.runtime.PiEntityType; import org.onosproject.net.pi.runtime.PiHandle; +import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent; +import org.onosproject.net.pi.service.PiPipeconfWatchdogListener; +import org.onosproject.net.pi.service.PiPipeconfWatchdogService; import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateRequest; import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest; import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse; @@ -39,10 +43,12 @@ import org.slf4j.Logger; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; +import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY; import static org.slf4j.LoggerFactory.getLogger; /** @@ -62,13 +68,28 @@ public abstract class AbstractDistributedP4RuntimeMirror @Reference(cardinality = ReferenceCardinality.MANDATORY) protected StorageService storageService; + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected PiPipeconfWatchdogService pipeconfWatchdogService; + private EventuallyConsistentMap> mirrorMap; private EventuallyConsistentMap annotationsMap; private final PiEntityType entityType; + private final boolean flushOnPipelineUnknown; + + private final PiPipeconfWatchdogListener pipeconfListener = + new InternalPipeconfWatchdogListener(); + AbstractDistributedP4RuntimeMirror(PiEntityType entityType) { this.entityType = entityType; + this.flushOnPipelineUnknown = false; + } + + AbstractDistributedP4RuntimeMirror(PiEntityType entityType, + boolean flushOnPipelineUnknown) { + this.entityType = entityType; + this.flushOnPipelineUnknown = flushOnPipelineUnknown; } @Activate @@ -94,11 +115,13 @@ public abstract class AbstractDistributedP4RuntimeMirror .withTimestampProvider((k, v) -> new WallClockTimestamp()) .build(); + pipeconfWatchdogService.addListener(pipeconfListener); log.info("Started"); } @Deactivate public void deactivate() { + pipeconfWatchdogService.removeListener(pipeconfListener); mirrorMap.destroy(); mirrorMap = null; log.info("Stopped"); @@ -123,6 +146,15 @@ public abstract class AbstractDistributedP4RuntimeMirror public void put(H handle, E entry) { checkNotNull(handle); checkNotNull(entry); + final PiPipeconfWatchdogService.PipelineStatus status = + pipeconfWatchdogService.getStatus(handle.deviceId()); + if (flushOnPipelineUnknown && !status.equals(READY)) { + // Keep mirror empty if pipeline status is UNKNOWN. + log.info("Ignoring {} mirror update because pipeline " + + "status of {} is {}: {}", + entityType, handle.deviceId(), status, entry); + return; + } final long now = new WallClockTimestamp().unixTimestamp(); final TimedEntry timedEntry = new TimedEntry<>(now, entry); mirrorMap.put(handle, timedEntry); @@ -190,6 +222,12 @@ public abstract class AbstractDistributedP4RuntimeMirror } } + private Set getHandlesForDevice(DeviceId deviceId) { + return mirrorMap.keySet().stream() + .filter(h -> h.deviceId().equals(deviceId)) + .collect(Collectors.toSet()); + } + private Map deviceHandleMap(DeviceId deviceId) { final Map deviceMap = Maps.newHashMap(); mirrorMap.entrySet().stream() @@ -198,6 +236,14 @@ public abstract class AbstractDistributedP4RuntimeMirror return deviceMap; } + + private void removeAll(DeviceId deviceId) { + checkNotNull(deviceId); + @SuppressWarnings("unchecked") + Collection handles = (Collection) getHandlesForDevice(deviceId); + handles.forEach(this::remove); + } + @Override public void applyWriteRequest(WriteRequest request) { // Optimistically assume all requests will be successful. @@ -228,4 +274,20 @@ public abstract class AbstractDistributedP4RuntimeMirror } }); } + + public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener { + @Override + public void event(PiPipeconfWatchdogEvent event) { + log.debug("Flushing mirror for {}, pipeline status is {}", + event.subject(), event.type()); + SharedExecutors.getPoolThreadExecutor().execute( + () -> removeAll(event.subject())); + } + + @Override + public boolean isRelevant(PiPipeconfWatchdogEvent event) { + return flushOnPipelineUnknown && + event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN); + } + } } diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java index 4876aaa4dc..35f209e162 100644 --- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java +++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java @@ -30,6 +30,9 @@ public final class DistributedP4RuntimePreEntryMirror implements P4RuntimePreEntryMirror { public DistributedP4RuntimePreEntryMirror() { - super(PiEntityType.PRE_ENTRY); + // PI does not support reading PRE entries. To avoid inconsistencies, + // flush mirror on device disconnection and other events which + // invalidate pipeline status. + super(PiEntityType.PRE_ENTRY, true); } }