Adding purgeOnDisconnect support to the meter subsystem and adding vlanId match

criteria to EAPOL trap flows.

* Adding purgeOnDisconnect property to MeterManager
* DeviceListener implementation on MeterManager
* Adding purgeMeter(DeviceId deviceId) method to MeterStore
* Calling the above method when DEVICE_AVAILABILITY_CHANGE is received
* Adding vlanId match criteria to EAPOL trap flows (OltPipeline change)

Change-Id: Ibb254302efe94edf1fd596f74a6eef6587410475
(cherry picked from commit 91b38543d822a0d9d092f9b3ff7760b1a206226a)
This commit is contained in:
Gamze Abaka 2019-03-11 06:52:48 +00:00 committed by Carmelo Cascone
parent effadedbe9
commit f57ef606fc
5 changed files with 84 additions and 17 deletions

View File

@ -102,13 +102,14 @@ public interface MeterStore extends Store<MeterEvent, MeterStoreDelegate> {
* Notifies the delegate that the meter failed to allow it * Notifies the delegate that the meter failed to allow it
* to nofity the app. * to nofity the app.
* *
* @param op a failed meter operation * @param op a failed meter operation
* @param reason a failure reason * @param reason a failure reason
*/ */
void failedMeter(MeterOperation op, MeterFailReason reason); void failedMeter(MeterOperation op, MeterFailReason reason);
/** /**
* Delete this meter immediately. * Delete this meter immediately.
*
* @param m a meter * @param m a meter
*/ */
void deleteMeterNow(Meter m); void deleteMeterNow(Meter m);
@ -134,8 +135,15 @@ public interface MeterStore extends Store<MeterEvent, MeterStoreDelegate> {
* Frees the given meter id. * Frees the given meter id.
* *
* @param deviceId the device id * @param deviceId the device id
* @param meterId the id to be freed * @param meterId the id to be freed
*/ */
void freeMeterId(DeviceId deviceId, MeterId meterId); void freeMeterId(DeviceId deviceId, MeterId meterId);
/**
* Removes all meters of given device from store.
*
* @param deviceId the device id
*/
void purgeMeter(DeviceId deviceId);
} }

View File

@ -119,6 +119,9 @@ public final class OsgiPropertyConstants {
public static final String MM_FALLBACK_METER_POLL_FREQUENCY = "fallbackMeterPollFrequency"; public static final String MM_FALLBACK_METER_POLL_FREQUENCY = "fallbackMeterPollFrequency";
public static final int MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT = 30; public static final int MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT = 30;
public static final String MM_PURGE_ON_DISCONNECTION = "purgeOnDisconnection";
public static final boolean MM_PURGE_ON_DISCONNECTION_DEFAULT = false;
public static final String NRM_ARP_ENABLED = "arpEnabled"; public static final String NRM_ARP_ENABLED = "arpEnabled";
public static final boolean NRM_ARP_ENABLED_DEFAULT = true; public static final boolean NRM_ARP_ENABLED_DEFAULT = true;

View File

@ -15,10 +15,13 @@
*/ */
package org.onosproject.net.meter.impl; package org.onosproject.net.meter.impl;
import org.onlab.util.Tools;
import org.onlab.util.TriConsumer; import org.onlab.util.TriConsumer;
import org.onosproject.cfg.ComponentConfigService; import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.mastership.MastershipService; import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId; import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService; import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.DriverService; import org.onosproject.net.driver.DriverService;
import org.onosproject.net.meter.DefaultMeter; import org.onosproject.net.meter.DefaultMeter;
@ -66,21 +69,24 @@ import static org.onosproject.net.OsgiPropertyConstants.MM_FALLBACK_METER_POLL_F
import static org.onosproject.net.OsgiPropertyConstants.MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT; import static org.onosproject.net.OsgiPropertyConstants.MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT;
import static org.onosproject.net.OsgiPropertyConstants.MM_NUM_THREADS; import static org.onosproject.net.OsgiPropertyConstants.MM_NUM_THREADS;
import static org.onosproject.net.OsgiPropertyConstants.MM_NUM_THREADS_DEFAULT; import static org.onosproject.net.OsgiPropertyConstants.MM_NUM_THREADS_DEFAULT;
import static org.onosproject.net.OsgiPropertyConstants.MM_PURGE_ON_DISCONNECTION;
import static org.onosproject.net.OsgiPropertyConstants.MM_PURGE_ON_DISCONNECTION_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
/** /**
* Provides implementation of the meter service APIs. * Provides implementation of the meter service APIs.
*/ */
@Component( @Component(
immediate = true, immediate = true,
service = { service = {
MeterService.class, MeterService.class,
MeterProviderRegistry.class MeterProviderRegistry.class
}, },
property = { property = {
MM_NUM_THREADS + ":Integer=" + MM_NUM_THREADS_DEFAULT, MM_NUM_THREADS + ":Integer=" + MM_NUM_THREADS_DEFAULT,
MM_FALLBACK_METER_POLL_FREQUENCY + ":Integer=" + MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT MM_FALLBACK_METER_POLL_FREQUENCY + ":Integer=" + MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT,
} MM_PURGE_ON_DISCONNECTION + ":Boolean=" + MM_PURGE_ON_DISCONNECTION_DEFAULT,
}
) )
public class MeterManager public class MeterManager
extends AbstractListenerProviderRegistry<MeterEvent, MeterListener, MeterProvider, MeterProviderService> extends AbstractListenerProviderRegistry<MeterEvent, MeterListener, MeterProvider, MeterProviderService>
@ -91,6 +97,7 @@ public class MeterManager
private final Logger log = getLogger(getClass()); private final Logger log = getLogger(getClass());
private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate(); private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
private final DeviceListener deviceListener = new InternalDeviceListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY) @Reference(cardinality = ReferenceCardinality.MANDATORY)
private MeterStore store; private MeterStore store;
@ -113,6 +120,9 @@ public class MeterManager
/** Frequency (in seconds) for polling meters via fallback provider. */ /** Frequency (in seconds) for polling meters via fallback provider. */
private int fallbackMeterPollFrequency = MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT; private int fallbackMeterPollFrequency = MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT;
/** Purge entries associated with a device when the device goes offline. */
private boolean purgeOnDisconnection = MM_PURGE_ON_DISCONNECTION_DEFAULT;
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete; private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
private ExecutorService executorService; private ExecutorService executorService;
@ -124,6 +134,7 @@ public class MeterManager
store.setDelegate(delegate); store.setDelegate(delegate);
cfgService.registerProperties(getClass()); cfgService.registerProperties(getClass());
eventDispatcher.addSink(MeterEvent.class, listenerRegistry); eventDispatcher.addSink(MeterEvent.class, listenerRegistry);
deviceService.addListener(deviceListener);
onComplete = (request, result, error) -> { onComplete = (request, result, error) -> {
request.context().ifPresent(c -> { request.context().ifPresent(c -> {
@ -160,6 +171,7 @@ public class MeterManager
defaultProvider.terminate(); defaultProvider.terminate();
store.unsetDelegate(delegate); store.unsetDelegate(delegate);
eventDispatcher.removeSink(MeterEvent.class); eventDispatcher.removeSink(MeterEvent.class);
deviceService.removeListener(deviceListener);
cfgService.unregisterProperties(getClass(), false); cfgService.unregisterProperties(getClass(), false);
executorService.shutdown(); executorService.shutdown();
log.info("Stopped"); log.info("Stopped");
@ -172,6 +184,17 @@ public class MeterManager
*/ */
private void readComponentConfiguration(ComponentContext context) { private void readComponentConfiguration(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties(); Dictionary<?, ?> properties = context.getProperties();
Boolean flag;
flag = Tools.isPropertyEnabled(properties, MM_PURGE_ON_DISCONNECTION);
if (flag == null) {
log.info("PurgeOnDisconnection is not configured," +
"using current value of {}", purgeOnDisconnection);
} else {
purgeOnDisconnection = flag;
log.info("Configured. PurgeOnDisconnection is {}",
purgeOnDisconnection ? "enabled" : "disabled");
}
String s = get(properties, MM_FALLBACK_METER_POLL_FREQUENCY); String s = get(properties, MM_FALLBACK_METER_POLL_FREQUENCY);
try { try {
@ -410,4 +433,24 @@ public class MeterManager
} }
} }
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_REMOVED:
case DEVICE_AVAILABILITY_CHANGED:
DeviceId deviceId = event.subject().id();
if (!deviceService.isAvailable(deviceId)) {
if (purgeOnDisconnection) {
store.purgeMeter(deviceId);
}
}
break;
default:
break;
}
}
}
} }

View File

@ -66,7 +66,9 @@ import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -334,6 +336,19 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m)); notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
} }
@Override
public void purgeMeter(DeviceId deviceId) {
List<Versioned<MeterData>> metersPendingRemove = meters.stream()
.filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
metersPendingRemove.forEach(versionedMeterKey
-> deleteMeterNow(versionedMeterKey.value().meter()));
}
@Override @Override
public long getMaxMeters(MeterFeaturesKey key) { public long getMaxMeters(MeterFeaturesKey key) {
MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null); MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);

View File

@ -795,7 +795,9 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
Instruction meter = filter.meta().metered(); Instruction meter = filter.meta().metered();
Instruction writeMetadata = filter.meta().writeMetadata(); Instruction writeMetadata = filter.meta().writeMetadata();
TrafficSelector selector = buildSelector(filter.key(), ethType); Criterion vlanId = filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
TrafficSelector selector = buildSelector(filter.key(), ethType, vlanId);
TrafficTreatment treatment = buildTreatment(output, meter, writeMetadata); TrafficTreatment treatment = buildTreatment(output, meter, writeMetadata);
buildAndApplyRule(filter, selector, treatment); buildAndApplyRule(filter, selector, treatment);
@ -904,19 +906,15 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
private TrafficSelector buildSelector(Criterion... criteria) { private TrafficSelector buildSelector(Criterion... criteria) {
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder(); TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
for (Criterion c : criteria) { Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
sBuilder.add(c);
}
return sBuilder.build(); return sBuilder.build();
} }
private TrafficTreatment buildTreatment(Instruction... instructions) { private TrafficTreatment buildTreatment(Instruction... instructions) {
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder(); TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add); Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);