diff --git a/apps/olt/src/main/java/org/onosproject/olt/Olt.java b/apps/olt/src/main/java/org/onosproject/olt/Olt.java index fdf82a00f5..d3120ddb54 100644 --- a/apps/olt/src/main/java/org/onosproject/olt/Olt.java +++ b/apps/olt/src/main/java/org/onosproject/olt/Olt.java @@ -15,6 +15,8 @@ */ package org.onosproject.olt; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -52,6 +54,7 @@ import org.slf4j.Logger; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -89,11 +92,16 @@ public class Olt private static final VlanId DEFAULT_VLAN = VlanId.vlanId((short) 0); private ExecutorService oltInstallers = Executors.newFixedThreadPool(4, - groupedThreads("onos/olt-service", - "olt-installer-%d")); + groupedThreads("onos/olt-service", + "olt-installer-%d")); private Map oltData = new ConcurrentHashMap<>(); + private Map> objectives = + Maps.newConcurrentMap(); + + private Map subscribers = Maps.newConcurrentMap(); + private InternalNetworkConfigListener configListener = new InternalNetworkConfigListener(); private static final Class CONFIG_CLASS = @@ -102,11 +110,12 @@ public class Olt private ConfigFactory configFactory = new ConfigFactory( SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") { - @Override - public AccessDeviceConfig createConfig() { - return new AccessDeviceConfig(); - } - }; + @Override + public AccessDeviceConfig createConfig() { + return new AccessDeviceConfig(); + } + }; + @Activate public void activate() { @@ -152,7 +161,68 @@ public class Olt @Override public void removeSubscriber(ConnectPoint port) { - throw new UnsupportedOperationException(); + AccessDeviceData olt = oltData.get(port.deviceId()); + + if (olt == null) { + log.warn("No data found for OLT device {}", port.deviceId()); + return; + } + + unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), olt.vlan()); + + } + + private void unprovisionSubscriber(DeviceId deviceId, PortNumber uplink, + PortNumber subscriberPort, VlanId deviceVlan) { + + //FIXME: This method is slightly ugly but it'll do until we have a better + // way to remove flows from the flow store. + + CompletableFuture downFuture = new CompletableFuture(); + CompletableFuture upFuture = new CompletableFuture(); + + ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort); + + VlanId subscriberVlan = subscribers.remove(cp); + + Set fwds = objectives.remove(cp); + + if (fwds == null || fwds.size() != 2) { + log.warn("Unknown or incomplete subscriber at {}", cp); + return; + } + + + fwds.stream().forEach( + fwd -> flowObjectiveService.forward(deviceId, + fwd.remove(new ObjectiveContext() { + @Override + public void onSuccess(Objective objective) { + upFuture.complete(null); + } + + @Override + public void onError(Objective objective, ObjectiveError error) { + upFuture.complete(error); + } + }))); + + upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> { + if (upStatus == null && downStatus == null) { + post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNREGISTERED, + deviceId, + deviceVlan, + subscriberVlan)); + } else if (downStatus != null) { + log.error("Subscriber with vlan {} on device {} " + + "on port {} failed downstream uninstallation: {}", + subscriberVlan, deviceId, subscriberPort, downStatus); + } else if (upStatus != null) { + log.error("Subscriber with vlan {} on device {} " + + "on port {} failed upstream uninstallation: {}", + subscriberVlan, deviceId, subscriberPort, upStatus); + } + }, oltInstallers); } @@ -190,46 +260,53 @@ public class Olt .build(); - ForwardingObjective upFwd = DefaultForwardingObjective.builder() + ForwardingObjective.Builder upFwd = DefaultForwardingObjective.builder() .withFlag(ForwardingObjective.Flag.VERSATILE) .withPriority(1000) .makePermanent() .withSelector(upstream) .fromApp(appId) - .withTreatment(upstreamTreatment) - .add(new ObjectiveContext() { - @Override - public void onSuccess(Objective objective) { - upFuture.complete(null); - } + .withTreatment(upstreamTreatment); - @Override - public void onError(Objective objective, ObjectiveError error) { - upFuture.complete(error); - } - }); - ForwardingObjective downFwd = DefaultForwardingObjective.builder() + ForwardingObjective.Builder downFwd = DefaultForwardingObjective.builder() .withFlag(ForwardingObjective.Flag.VERSATILE) .withPriority(1000) .makePermanent() .withSelector(downstream) .fromApp(appId) - .withTreatment(downstreamTreatment) - .add(new ObjectiveContext() { - @Override - public void onSuccess(Objective objective) { - downFuture.complete(null); - } + .withTreatment(downstreamTreatment); - @Override - public void onError(Objective objective, ObjectiveError error) { - downFuture.complete(error); - } - }); + ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort); - flowObjectiveService.forward(deviceId, upFwd); - flowObjectiveService.forward(deviceId, downFwd); + subscribers.put(cp, subscriberVlan); + objectives.put(cp, Sets.newHashSet(upFwd, downFwd)); + + + flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() { + @Override + public void onSuccess(Objective objective) { + upFuture.complete(null); + } + + @Override + public void onError(Objective objective, ObjectiveError error) { + upFuture.complete(error); + } + })); + + + flowObjectiveService.forward(deviceId, downFwd.add(new ObjectiveContext() { + @Override + public void onSuccess(Objective objective) { + downFuture.complete(null); + } + + @Override + public void onError(Objective objective, ObjectiveError error) { + downFuture.complete(error); + } + })); upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> { if (upStatus == null && downStatus == null) { @@ -288,20 +365,20 @@ public class Olt public void event(NetworkConfigEvent event) { switch (event.type()) { - case CONFIG_ADDED: - case CONFIG_UPDATED: - if (event.configClass().equals(CONFIG_CLASS)) { - AccessDeviceConfig config = - networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS); - if (config != null) { - oltData.put(config.getOlt().deviceId(), config.getOlt()); + case CONFIG_ADDED: + case CONFIG_UPDATED: + if (event.configClass().equals(CONFIG_CLASS)) { + AccessDeviceConfig config = + networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS); + if (config != null) { + oltData.put(config.getOlt().deviceId(), config.getOlt()); + } } - } - break; - case CONFIG_UNREGISTERED: - case CONFIG_REMOVED: - default: - break; + break; + case CONFIG_UNREGISTERED: + case CONFIG_REMOVED: + default: + break; } } }