mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-10-17 02:11:38 +02:00
[ONOS-5936] (vCore) Virtual FlowObjective Manager and Store
Changes 1. FlowObjective manager for virtual network is added 2. VirtualFlowObjective store is added 3. SimpleVirtualFlowObjectiveStore is implementation 4. Unit tests are added Change-Id: I18ff1d440d1f85ca96fff36a33a8b67566031e2c
This commit is contained in:
parent
0aba3e8a06
commit
86bebed7ca
@ -0,0 +1,75 @@
|
||||
/*
|
||||
* Copyright 2017-present Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.incubator.net.virtual;
|
||||
|
||||
import org.onosproject.net.behaviour.NextGroup;
|
||||
import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
|
||||
import org.onosproject.net.flowobjective.ObjectiveEvent;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The flow objective store for virtual networks.
|
||||
*/
|
||||
public interface VirtualNetworkFlowObjectiveStore
|
||||
extends VirtualStore<ObjectiveEvent, FlowObjectiveStoreDelegate> {
|
||||
|
||||
/**
|
||||
* Adds a NextGroup to the store, by mapping it to the nextId as key,
|
||||
* and replacing any previous mapping.
|
||||
*
|
||||
* @param networkId a virtual network identifier
|
||||
* @param nextId an integer
|
||||
* @param group a next group opaque object
|
||||
*/
|
||||
void putNextGroup(NetworkId networkId, Integer nextId, NextGroup group);
|
||||
|
||||
/**
|
||||
* Fetch a next group from the store.
|
||||
*
|
||||
* @param networkId a virtual network identifier
|
||||
* @param nextId an integer used as key
|
||||
* @return a next group, or null if group was not found
|
||||
*/
|
||||
NextGroup getNextGroup(NetworkId networkId, Integer nextId);
|
||||
|
||||
/**
|
||||
* Remove a next group mapping from the store.
|
||||
*
|
||||
* @param networkId a virtual network identifier
|
||||
* @param nextId the key to remove from the store.
|
||||
* @return the next group which mapped to the nextId and is now removed, or
|
||||
* null if no group mapping existed in the store
|
||||
*/
|
||||
NextGroup removeNextGroup(NetworkId networkId, Integer nextId);
|
||||
|
||||
/**
|
||||
* Fetch all groups from the store and their mapping to nextIds.
|
||||
*
|
||||
* @param networkId a virtual network identifier
|
||||
* @return a map that represents the current snapshot of Next-ids to NextGroups
|
||||
*/
|
||||
Map<Integer, NextGroup> getAllGroups(NetworkId networkId);
|
||||
|
||||
/**
|
||||
* Allocates a next objective id. This id is globally unique.
|
||||
*
|
||||
* @param networkId a virtual network identifier
|
||||
* @return an integer
|
||||
*/
|
||||
int allocateNextId(NetworkId networkId);
|
||||
}
|
@ -0,0 +1,628 @@
|
||||
/*
|
||||
* Copyright 2017-present Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.incubator.net.virtual.impl;
|
||||
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.RemovalCause;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.onlab.osgi.ServiceDirectory;
|
||||
import org.onlab.util.KryoNamespace;
|
||||
import org.onosproject.incubator.net.virtual.AbstractVnetService;
|
||||
import org.onosproject.incubator.net.virtual.NetworkId;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkService;
|
||||
import org.onosproject.net.DeviceId;
|
||||
import org.onosproject.net.behaviour.NextGroup;
|
||||
import org.onosproject.net.behaviour.Pipeliner;
|
||||
import org.onosproject.net.behaviour.PipelinerContext;
|
||||
import org.onosproject.net.device.DeviceService;
|
||||
import org.onosproject.net.driver.AbstractHandlerBehaviour;
|
||||
import org.onosproject.net.flow.DefaultFlowRule;
|
||||
import org.onosproject.net.flow.DefaultTrafficSelector;
|
||||
import org.onosproject.net.flow.DefaultTrafficTreatment;
|
||||
import org.onosproject.net.flow.FlowRule;
|
||||
import org.onosproject.net.flow.FlowRuleOperations;
|
||||
import org.onosproject.net.flow.FlowRuleOperationsContext;
|
||||
import org.onosproject.net.flow.FlowRuleService;
|
||||
import org.onosproject.net.flow.TrafficSelector;
|
||||
import org.onosproject.net.flow.TrafficTreatment;
|
||||
import org.onosproject.net.flowobjective.FilteringObjective;
|
||||
import org.onosproject.net.flowobjective.FlowObjectiveService;
|
||||
import org.onosproject.net.flowobjective.FlowObjectiveStore;
|
||||
import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
|
||||
import org.onosproject.net.flowobjective.ForwardingObjective;
|
||||
import org.onosproject.net.flowobjective.NextObjective;
|
||||
import org.onosproject.net.flowobjective.Objective;
|
||||
import org.onosproject.net.flowobjective.ObjectiveError;
|
||||
import org.onosproject.net.flowobjective.ObjectiveEvent;
|
||||
import org.onosproject.net.group.DefaultGroupKey;
|
||||
import org.onosproject.net.group.GroupKey;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
|
||||
import static org.onlab.util.Tools.groupedThreads;
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
/**
|
||||
* Provides implementation of the flow objective programming service for virtual networks.
|
||||
*/
|
||||
// NOTE: This manager is designed to provide flow objective programming service
|
||||
// for virtual networks. Actually, virtual networks don't need to consider
|
||||
// the different implementation of data-path pipeline. But, the interfaces
|
||||
// and usages of flow objective service are still valuable for virtual network.
|
||||
// This manager is working as an interpreter from FlowObjective to FlowRules
|
||||
// to provide symmetric interfaces with ONOS core services.
|
||||
// The behaviours are based on DefaultSingleTablePipeline.
|
||||
|
||||
public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
|
||||
implements FlowObjectiveService {
|
||||
|
||||
public static final int INSTALL_RETRY_ATTEMPTS = 5;
|
||||
public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
protected DeviceService deviceService;
|
||||
|
||||
// Note: The following dependencies are added on behalf of the pipeline
|
||||
// driver behaviours to assure these services are available for their
|
||||
// initialization.
|
||||
protected FlowRuleService flowRuleService;
|
||||
|
||||
protected VirtualNetworkFlowObjectiveStore virtualFlowObjectiveStore;
|
||||
protected FlowObjectiveStore flowObjectiveStore;
|
||||
private final FlowObjectiveStoreDelegate delegate;
|
||||
|
||||
private final PipelinerContext context = new InnerPipelineContext();
|
||||
|
||||
private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
|
||||
private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
|
||||
|
||||
// local store to track which nextObjectives were sent to which device
|
||||
// for debugging purposes
|
||||
private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
public VirtualNetworkFlowObjectiveManager(VirtualNetworkService manager,
|
||||
NetworkId networkId) {
|
||||
super(manager, networkId);
|
||||
|
||||
deviceService = manager.get(networkId(), DeviceService.class);
|
||||
flowRuleService = manager.get(networkId(), FlowRuleService.class);
|
||||
|
||||
executorService = newFixedThreadPool(4, groupedThreads("onos/virtual/objective-installer", "%d", log));
|
||||
|
||||
virtualFlowObjectiveStore =
|
||||
serviceDirectory.get(VirtualNetworkFlowObjectiveStore.class);
|
||||
delegate = new InternalStoreDelegate();
|
||||
virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
|
||||
flowObjectiveStore = new StoreConvertor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
|
||||
executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
|
||||
if (queueObjective(deviceId, forwardingObjective)) {
|
||||
return;
|
||||
}
|
||||
executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void next(DeviceId deviceId, NextObjective nextObjective) {
|
||||
nextToDevice.put(nextObjective.id(), deviceId);
|
||||
executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int allocateNextId() {
|
||||
return flowObjectiveStore.allocateNextId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initPolicy(String policy) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getNextMappings() {
|
||||
List<String> mappings = new ArrayList<>();
|
||||
Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
|
||||
// XXX if the NextGroup after de-serialization actually stored info of the deviceId
|
||||
// then info on any nextObj could be retrieved from one controller instance.
|
||||
// Right now the drivers on one instance can only fetch for next-ids that came
|
||||
// to them.
|
||||
// Also, we still need to send the right next-id to the right driver as potentially
|
||||
// there can be different drivers for different devices. But on that account,
|
||||
// no instance should be decoding for another instance's nextIds.
|
||||
|
||||
for (Map.Entry<Integer, NextGroup> e : allnexts.entrySet()) {
|
||||
// get the device this next Objective was sent to
|
||||
DeviceId deviceId = nextToDevice.get(e.getKey());
|
||||
mappings.add("NextId " + e.getKey() + ": " +
|
||||
((deviceId != null) ? deviceId : "nextId not in this onos instance"));
|
||||
if (deviceId != null) {
|
||||
// this instance of the controller sent the nextObj to a driver
|
||||
Pipeliner pipeliner = getDevicePipeliner(deviceId);
|
||||
List<String> nextMappings = pipeliner.getNextMappings(e.getValue());
|
||||
if (nextMappings != null) {
|
||||
mappings.addAll(nextMappings);
|
||||
}
|
||||
}
|
||||
}
|
||||
return mappings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPendingNexts() {
|
||||
List<String> pendingNexts = new ArrayList<>();
|
||||
for (Integer nextId : pendingForwards.keySet()) {
|
||||
Set<PendingNext> pnext = pendingForwards.get(nextId);
|
||||
StringBuilder pend = new StringBuilder();
|
||||
pend.append("Next Id: ").append(Integer.toString(nextId))
|
||||
.append(" :: ");
|
||||
for (PendingNext pn : pnext) {
|
||||
pend.append(Integer.toString(pn.forwardingObjective().id()))
|
||||
.append(" ");
|
||||
}
|
||||
pendingNexts.add(pend.toString());
|
||||
}
|
||||
return pendingNexts;
|
||||
}
|
||||
|
||||
private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
|
||||
if (fwd.nextId() == null ||
|
||||
flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
|
||||
// fast path
|
||||
return false;
|
||||
}
|
||||
boolean queued = false;
|
||||
synchronized (pendingForwards) {
|
||||
// double check the flow objective store, because this block could run
|
||||
// after a notification arrives
|
||||
if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
|
||||
pendingForwards.compute(fwd.nextId(), (id, pending) -> {
|
||||
PendingNext next = new PendingNext(deviceId, fwd);
|
||||
if (pending == null) {
|
||||
return Sets.newHashSet(next);
|
||||
} else {
|
||||
pending.add(next);
|
||||
return pending;
|
||||
}
|
||||
});
|
||||
queued = true;
|
||||
}
|
||||
}
|
||||
if (queued) {
|
||||
log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
|
||||
fwd.id(), fwd.nextId(), deviceId);
|
||||
}
|
||||
return queued;
|
||||
}
|
||||
|
||||
/**
|
||||
* Task that passes the flow objective down to the driver. The task will
|
||||
* make a few attempts to find the appropriate driver, then eventually give
|
||||
* up and report an error if no suitable driver could be found.
|
||||
*/
|
||||
private class ObjectiveInstaller implements Runnable {
|
||||
private final DeviceId deviceId;
|
||||
private final Objective objective;
|
||||
|
||||
private final int numAttempts;
|
||||
|
||||
public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
|
||||
this(deviceId, objective, 1);
|
||||
}
|
||||
|
||||
public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
|
||||
this.deviceId = checkNotNull(deviceId);
|
||||
this.objective = checkNotNull(objective);
|
||||
this.numAttempts = checkNotNull(attemps);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Pipeliner pipeliner = getDevicePipeliner(deviceId);
|
||||
|
||||
if (pipeliner != null) {
|
||||
if (objective instanceof NextObjective) {
|
||||
pipeliner.next((NextObjective) objective);
|
||||
} else if (objective instanceof ForwardingObjective) {
|
||||
pipeliner.forward((ForwardingObjective) objective);
|
||||
} else {
|
||||
pipeliner.filter((FilteringObjective) objective);
|
||||
}
|
||||
//Attempts to check if pipeliner is null for retry attempts
|
||||
} else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
|
||||
Thread.sleep(INSTALL_RETRY_INTERVAL);
|
||||
executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
|
||||
} else {
|
||||
// Otherwise we've tried a few times and failed, report an
|
||||
// error back to the user.
|
||||
objective.context().ifPresent(
|
||||
c -> c.onError(objective, ObjectiveError.NOPIPELINER));
|
||||
}
|
||||
//Excpetion thrown
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception while installing flow objective", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
|
||||
@Override
|
||||
public void notify(ObjectiveEvent event) {
|
||||
if (event.type() == ObjectiveEvent.Type.ADD) {
|
||||
log.debug("Received notification of obj event {}", event);
|
||||
Set<PendingNext> pending;
|
||||
synchronized (pendingForwards) {
|
||||
// needs to be synchronized for queueObjective lookup
|
||||
pending = pendingForwards.remove(event.subject());
|
||||
}
|
||||
|
||||
if (pending == null) {
|
||||
log.debug("Nothing pending for this obj event {}", event);
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("Processing {} pending forwarding objectives for nextId {}",
|
||||
pending.size(), event.subject());
|
||||
pending.forEach(p -> getDevicePipeliner(p.deviceId())
|
||||
.forward(p.forwardingObjective()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves (if it exists) the device pipeline behaviour from the cache.
|
||||
* Otherwise it warms the caches and triggers the init method of the Pipeline.
|
||||
* For virtual network, it returns OVS pipeliner.
|
||||
*
|
||||
* @param deviceId the id of the device associated to the pipeline
|
||||
* @return the implementation of the Pipeliner behaviour
|
||||
*/
|
||||
private Pipeliner getDevicePipeliner(DeviceId deviceId) {
|
||||
return pipeliners.computeIfAbsent(deviceId, this::initPipelineHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates and initialize {@link Pipeliner}.
|
||||
* <p>
|
||||
* Note: Expected to be called under per-Device lock.
|
||||
* e.g., {@code pipeliners}' Map#compute family methods
|
||||
*
|
||||
* @param deviceId Device to initialize pipeliner
|
||||
* @return {@link Pipeliner} instance or null
|
||||
*/
|
||||
private Pipeliner initPipelineHandler(DeviceId deviceId) {
|
||||
//FIXME: do we need a standard pipeline for virtual device?
|
||||
Pipeliner pipeliner = new DefaultVirtualDevicePipeline();
|
||||
pipeliner.init(deviceId, context);
|
||||
return pipeliner;
|
||||
}
|
||||
|
||||
// Processing context for initializing pipeline driver behaviours.
|
||||
private class InnerPipelineContext implements PipelinerContext {
|
||||
public ServiceDirectory directory() {
|
||||
return serviceDirectory;
|
||||
}
|
||||
|
||||
public FlowObjectiveStore store() {
|
||||
return flowObjectiveStore;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Data class used to hold a pending forwarding objective that could not
|
||||
* be processed because the associated next object was not present.
|
||||
*/
|
||||
private class PendingNext {
|
||||
private final DeviceId deviceId;
|
||||
private final ForwardingObjective fwd;
|
||||
|
||||
public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
|
||||
this.deviceId = deviceId;
|
||||
this.fwd = fwd;
|
||||
}
|
||||
|
||||
public DeviceId deviceId() {
|
||||
return deviceId;
|
||||
}
|
||||
|
||||
public ForwardingObjective forwardingObjective() {
|
||||
return fwd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(deviceId, fwd);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (!(obj instanceof PendingNext)) {
|
||||
return false;
|
||||
}
|
||||
final PendingNext other = (PendingNext) obj;
|
||||
if (this.deviceId.equals(other.deviceId) &&
|
||||
this.fwd.equals(other.fwd)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is a wrapping class from VirtualNetworkFlowObjectiveStore
|
||||
* to FlowObjectiveStore for PipelinerContext.
|
||||
*/
|
||||
private class StoreConvertor implements FlowObjectiveStore {
|
||||
|
||||
@Override
|
||||
public void setDelegate(FlowObjectiveStoreDelegate delegate) {
|
||||
virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsetDelegate(FlowObjectiveStoreDelegate delegate) {
|
||||
virtualFlowObjectiveStore.unsetDelegate(networkId(), delegate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasDelegate() {
|
||||
return virtualFlowObjectiveStore.hasDelegate(networkId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putNextGroup(Integer nextId, NextGroup group) {
|
||||
virtualFlowObjectiveStore.putNextGroup(networkId(), nextId, group);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NextGroup getNextGroup(Integer nextId) {
|
||||
return virtualFlowObjectiveStore.getNextGroup(networkId(), nextId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NextGroup removeNextGroup(Integer nextId) {
|
||||
return virtualFlowObjectiveStore.removeNextGroup(networkId(), nextId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Integer, NextGroup> getAllGroups() {
|
||||
return virtualFlowObjectiveStore.getAllGroups(networkId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int allocateNextId() {
|
||||
return virtualFlowObjectiveStore.allocateNextId(networkId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple single table pipeline abstraction for virtual networks.
|
||||
*/
|
||||
private class DefaultVirtualDevicePipeline
|
||||
extends AbstractHandlerBehaviour implements Pipeliner {
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
private DeviceId deviceId;
|
||||
|
||||
private Cache<Integer, NextObjective> pendingNext;
|
||||
|
||||
private KryoNamespace appKryo = new KryoNamespace.Builder()
|
||||
.register(GroupKey.class)
|
||||
.register(DefaultGroupKey.class)
|
||||
.register(SingleGroup.class)
|
||||
.register(byte[].class)
|
||||
.build("DefaultVirtualDevicePipeline");
|
||||
|
||||
@Override
|
||||
public void init(DeviceId deviceId, PipelinerContext context) {
|
||||
this.deviceId = deviceId;
|
||||
|
||||
pendingNext = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(20, TimeUnit.SECONDS)
|
||||
.removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
|
||||
if (notification.getCause() == RemovalCause.EXPIRED) {
|
||||
notification.getValue().context()
|
||||
.ifPresent(c -> c.onError(notification.getValue(),
|
||||
ObjectiveError.FLOWINSTALLATIONFAILED));
|
||||
}
|
||||
}).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void filter(FilteringObjective filter) {
|
||||
|
||||
TrafficTreatment.Builder actions;
|
||||
switch (filter.type()) {
|
||||
case PERMIT:
|
||||
actions = (filter.meta() == null) ?
|
||||
DefaultTrafficTreatment.builder().punt() :
|
||||
DefaultTrafficTreatment.builder(filter.meta());
|
||||
break;
|
||||
case DENY:
|
||||
actions = (filter.meta() == null) ?
|
||||
DefaultTrafficTreatment.builder() :
|
||||
DefaultTrafficTreatment.builder(filter.meta());
|
||||
actions.drop();
|
||||
break;
|
||||
default:
|
||||
log.warn("Unknown filter type: {}", filter.type());
|
||||
actions = DefaultTrafficTreatment.builder().drop();
|
||||
}
|
||||
|
||||
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
|
||||
|
||||
filter.conditions().forEach(selector::add);
|
||||
|
||||
if (filter.key() != null) {
|
||||
selector.add(filter.key());
|
||||
}
|
||||
|
||||
FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
|
||||
.forDevice(deviceId)
|
||||
.withSelector(selector.build())
|
||||
.withTreatment(actions.build())
|
||||
.fromApp(filter.appId())
|
||||
.withPriority(filter.priority());
|
||||
|
||||
if (filter.permanent()) {
|
||||
ruleBuilder.makePermanent();
|
||||
} else {
|
||||
ruleBuilder.makeTemporary(filter.timeout());
|
||||
}
|
||||
|
||||
installObjective(ruleBuilder, filter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forward(ForwardingObjective fwd) {
|
||||
TrafficSelector selector = fwd.selector();
|
||||
|
||||
if (fwd.treatment() != null) {
|
||||
// Deal with SPECIFIC and VERSATILE in the same manner.
|
||||
FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
|
||||
.forDevice(deviceId)
|
||||
.withSelector(selector)
|
||||
.fromApp(fwd.appId())
|
||||
.withPriority(fwd.priority())
|
||||
.withTreatment(fwd.treatment());
|
||||
|
||||
if (fwd.permanent()) {
|
||||
ruleBuilder.makePermanent();
|
||||
} else {
|
||||
ruleBuilder.makeTemporary(fwd.timeout());
|
||||
}
|
||||
installObjective(ruleBuilder, fwd);
|
||||
|
||||
} else {
|
||||
NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
|
||||
if (nextObjective != null) {
|
||||
pendingNext.invalidate(fwd.nextId());
|
||||
nextObjective.next().forEach(treat -> {
|
||||
FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
|
||||
.forDevice(deviceId)
|
||||
.withSelector(selector)
|
||||
.fromApp(fwd.appId())
|
||||
.withPriority(fwd.priority())
|
||||
.withTreatment(treat);
|
||||
|
||||
if (fwd.permanent()) {
|
||||
ruleBuilder.makePermanent();
|
||||
} else {
|
||||
ruleBuilder.makeTemporary(fwd.timeout());
|
||||
}
|
||||
installObjective(ruleBuilder, fwd);
|
||||
});
|
||||
} else {
|
||||
fwd.context().ifPresent(c -> c.onError(fwd,
|
||||
ObjectiveError.GROUPMISSING));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
|
||||
FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
|
||||
switch (objective.op()) {
|
||||
|
||||
case ADD:
|
||||
flowBuilder.add(ruleBuilder.build());
|
||||
break;
|
||||
case REMOVE:
|
||||
flowBuilder.remove(ruleBuilder.build());
|
||||
break;
|
||||
default:
|
||||
log.warn("Unknown operation {}", objective.op());
|
||||
}
|
||||
|
||||
flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
|
||||
@Override
|
||||
public void onSuccess(FlowRuleOperations ops) {
|
||||
objective.context().ifPresent(context -> context.onSuccess(objective));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(FlowRuleOperations ops) {
|
||||
objective.context()
|
||||
.ifPresent(context ->
|
||||
context.onError(objective,
|
||||
ObjectiveError.FLOWINSTALLATIONFAILED));
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void next(NextObjective nextObjective) {
|
||||
|
||||
pendingNext.put(nextObjective.id(), nextObjective);
|
||||
flowObjectiveStore.putNextGroup(nextObjective.id(),
|
||||
new SingleGroup(
|
||||
new DefaultGroupKey(
|
||||
appKryo.serialize(nextObjective.id()))));
|
||||
nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getNextMappings(NextGroup nextGroup) {
|
||||
// Default single table pipeline does not use nextObjectives or groups
|
||||
return null;
|
||||
}
|
||||
|
||||
private class SingleGroup implements NextGroup {
|
||||
|
||||
private final GroupKey key;
|
||||
|
||||
public SingleGroup(GroupKey key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
public GroupKey key() {
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] data() {
|
||||
return appKryo.serialize(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -58,6 +58,7 @@ import org.onosproject.net.PortNumber;
|
||||
import org.onosproject.net.device.DeviceService;
|
||||
import org.onosproject.net.flow.FlowRuleService;
|
||||
import org.onosproject.net.group.GroupService;
|
||||
import org.onosproject.net.flowobjective.FlowObjectiveService;
|
||||
import org.onosproject.net.host.HostService;
|
||||
import org.onosproject.net.intent.IntentEvent;
|
||||
import org.onosproject.net.intent.IntentListener;
|
||||
@ -425,6 +426,8 @@ public class VirtualNetworkManager
|
||||
service = new VirtualNetworkPacketManager(this, network.id());
|
||||
} else if (serviceKey.serviceClass.equals(GroupService.class)) {
|
||||
service = new VirtualNetworkGroupManager(this, network.id());
|
||||
} else if (serviceKey.serviceClass.equals(FlowObjectiveService.class)) {
|
||||
service = new VirtualNetworkFlowObjectiveManager(this, network.id());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
@ -0,0 +1,198 @@
|
||||
/*
|
||||
* Copyright 2017-present Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.incubator.net.virtual.impl;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.onlab.junit.TestTools;
|
||||
import org.onlab.junit.TestUtils;
|
||||
import org.onlab.osgi.ServiceDirectory;
|
||||
import org.onlab.osgi.TestServiceDirectory;
|
||||
import org.onosproject.TestApplicationId;
|
||||
import org.onosproject.common.event.impl.TestEventDispatcher;
|
||||
import org.onosproject.core.ApplicationId;
|
||||
import org.onosproject.core.CoreService;
|
||||
import org.onosproject.event.EventDeliveryService;
|
||||
import org.onosproject.incubator.net.virtual.NetworkId;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetwork;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkStore;
|
||||
import org.onosproject.incubator.net.virtual.event.VirtualEvent;
|
||||
import org.onosproject.incubator.net.virtual.event.VirtualListenerRegistryManager;
|
||||
import org.onosproject.incubator.net.virtual.impl.provider.VirtualProviderManager;
|
||||
import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProvider;
|
||||
import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
|
||||
import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
|
||||
import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
|
||||
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowObjectiveStore;
|
||||
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowRuleStore;
|
||||
import org.onosproject.net.NetTestTools;
|
||||
import org.onosproject.net.flow.DefaultTrafficSelector;
|
||||
import org.onosproject.net.flow.DefaultTrafficTreatment;
|
||||
import org.onosproject.net.flow.FlowRule;
|
||||
import org.onosproject.net.flow.FlowRuleBatchOperation;
|
||||
import org.onosproject.net.flow.TrafficSelector;
|
||||
import org.onosproject.net.flow.TrafficTreatment;
|
||||
import org.onosproject.net.flowobjective.DefaultForwardingObjective;
|
||||
import org.onosproject.net.flowobjective.FlowObjectiveService;
|
||||
import org.onosproject.net.flowobjective.ForwardingObjective;
|
||||
import org.onosproject.net.intent.FakeIntentManager;
|
||||
import org.onosproject.net.intent.TestableIntentService;
|
||||
import org.onosproject.net.provider.ProviderId;
|
||||
import org.onosproject.store.service.StorageService;
|
||||
import org.onosproject.store.service.TestStorageService;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Junit tests for VirtualNetworkFlowObjectiveManager.
|
||||
*/
|
||||
public class VirtualNetworkFlowObjectiveManagerTest
|
||||
extends VirtualNetworkTestUtil {
|
||||
|
||||
private static final int RETRY_MS = 250;
|
||||
|
||||
private VirtualNetworkManager manager;
|
||||
private DistributedVirtualNetworkStore virtualNetworkManagerStore;
|
||||
private TestableIntentService intentService = new FakeIntentManager();
|
||||
private ServiceDirectory testDirectory;
|
||||
private SimpleVirtualFlowObjectiveStore flowObjectiveStore;
|
||||
|
||||
private VirtualProviderManager providerRegistryService;
|
||||
private EventDeliveryService eventDeliveryService;
|
||||
VirtualListenerRegistryManager listenerRegistryManager =
|
||||
VirtualListenerRegistryManager.getInstance();
|
||||
|
||||
private ApplicationId appId;
|
||||
|
||||
private VirtualNetwork vnet1;
|
||||
private VirtualNetwork vnet2;
|
||||
|
||||
private FlowObjectiveService service1;
|
||||
private FlowObjectiveService service2;
|
||||
|
||||
//FIXME: referring flowrule service, store, and provider shouldn't be here
|
||||
private VirtualFlowRuleProvider flowRuleProvider = new TestProvider();
|
||||
private SimpleVirtualFlowRuleStore flowRuleStore;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
virtualNetworkManagerStore = new DistributedVirtualNetworkStore();
|
||||
|
||||
CoreService coreService = new TestCoreService();
|
||||
TestUtils.setField(virtualNetworkManagerStore, "coreService", coreService);
|
||||
StorageService storageService = new TestStorageService();
|
||||
TestUtils.setField(virtualNetworkManagerStore, "storageService", storageService);
|
||||
virtualNetworkManagerStore.activate();
|
||||
|
||||
flowObjectiveStore = new SimpleVirtualFlowObjectiveStore();
|
||||
TestUtils.setField(flowObjectiveStore, "storageService", storageService);
|
||||
flowObjectiveStore.activate();
|
||||
flowRuleStore = new SimpleVirtualFlowRuleStore();
|
||||
flowRuleStore.activate();
|
||||
|
||||
manager = new VirtualNetworkManager();
|
||||
manager.store = virtualNetworkManagerStore;
|
||||
manager.intentService = intentService;
|
||||
TestUtils.setField(manager, "coreService", coreService);
|
||||
|
||||
providerRegistryService = new VirtualProviderManager();
|
||||
providerRegistryService.registerProvider(flowRuleProvider);
|
||||
|
||||
eventDeliveryService = new TestEventDispatcher();
|
||||
NetTestTools.injectEventDispatcher(manager, eventDeliveryService);
|
||||
eventDeliveryService.addSink(VirtualEvent.class, listenerRegistryManager);
|
||||
|
||||
appId = new TestApplicationId("FlowRuleManagerTest");
|
||||
|
||||
testDirectory = new TestServiceDirectory()
|
||||
.add(VirtualNetworkStore.class, virtualNetworkManagerStore)
|
||||
.add(CoreService.class, coreService)
|
||||
.add(EventDeliveryService.class, eventDeliveryService)
|
||||
.add(VirtualProviderRegistryService.class, providerRegistryService)
|
||||
.add(VirtualNetworkFlowRuleStore.class, flowRuleStore)
|
||||
.add(VirtualNetworkFlowObjectiveStore.class, flowObjectiveStore);
|
||||
TestUtils.setField(manager, "serviceDirectory", testDirectory);
|
||||
|
||||
manager.activate();
|
||||
|
||||
vnet1 = setupVirtualNetworkTopology(manager, TID1);
|
||||
vnet2 = setupVirtualNetworkTopology(manager, TID2);
|
||||
|
||||
service1 = new VirtualNetworkFlowObjectiveManager(manager, vnet1.id());
|
||||
service2 = new VirtualNetworkFlowObjectiveManager(manager, vnet2.id());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownTest() {
|
||||
manager.deactivate();
|
||||
virtualNetworkManagerStore.deactivate();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests adding a forwarding objective.
|
||||
*/
|
||||
@Test
|
||||
public void forwardingObjective() {
|
||||
TrafficSelector selector = DefaultTrafficSelector.emptySelector();
|
||||
TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
|
||||
ForwardingObjective forward =
|
||||
DefaultForwardingObjective.builder()
|
||||
.fromApp(NetTestTools.APP_ID)
|
||||
.withFlag(ForwardingObjective.Flag.SPECIFIC)
|
||||
.withSelector(selector)
|
||||
.withTreatment(treatment)
|
||||
.makePermanent()
|
||||
.add();
|
||||
|
||||
service1.forward(VDID1, forward);
|
||||
|
||||
TestTools.assertAfter(RETRY_MS, () ->
|
||||
assertEquals("1 flowrule entry expected",
|
||||
1, flowRuleStore.getFlowRuleCount(vnet1.id())));
|
||||
TestTools.assertAfter(RETRY_MS, () ->
|
||||
assertEquals("0 flowrule entry expected",
|
||||
0, flowRuleStore.getFlowRuleCount(vnet2.id())));
|
||||
}
|
||||
|
||||
//TODO: More test cases for filter, foward, and next
|
||||
|
||||
private class TestProvider extends AbstractVirtualProvider
|
||||
implements VirtualFlowRuleProvider {
|
||||
|
||||
protected TestProvider() {
|
||||
super(new ProviderId("test", "org.onosproject.virtual.testprovider"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyFlowRule(NetworkId networkId, FlowRule... flowRules) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeFlowRule(NetworkId networkId, FlowRule... flowRules) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeBatch(NetworkId networkId, FlowRuleBatchOperation batch) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
@ -40,6 +40,7 @@ import org.onosproject.incubator.net.virtual.VirtualHost;
|
||||
import org.onosproject.incubator.net.virtual.VirtualLink;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetwork;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkEvent;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkGroupStore;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkIntent;
|
||||
@ -54,6 +55,7 @@ import org.onosproject.incubator.net.virtual.impl.provider.VirtualProviderManage
|
||||
import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProviderService;
|
||||
import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
|
||||
import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
|
||||
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowObjectiveStore;
|
||||
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowRuleStore;
|
||||
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualGroupStore;
|
||||
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualPacketStore;
|
||||
@ -833,7 +835,8 @@ public class VirtualNetworkManagerTest extends VirtualNetworkTestUtil {
|
||||
.add(ClusterService.class, new ClusterServiceAdapter())
|
||||
.add(VirtualNetworkFlowRuleStore.class, new SimpleVirtualFlowRuleStore())
|
||||
.add(VirtualNetworkPacketStore.class, new SimpleVirtualPacketStore())
|
||||
.add(VirtualNetworkGroupStore.class, new SimpleVirtualGroupStore());
|
||||
.add(VirtualNetworkGroupStore.class, new SimpleVirtualGroupStore())
|
||||
.add(VirtualNetworkFlowObjectiveStore.class, new SimpleVirtualFlowObjectiveStore());
|
||||
|
||||
validateServiceGetReturnsSavedInstance(virtualNetwork.id(), FlowRuleService.class);
|
||||
validateServiceGetReturnsSavedInstance(virtualNetwork.id(), PacketService.class);
|
||||
|
@ -33,13 +33,18 @@ import org.onosproject.event.EventDeliveryService;
|
||||
import org.onosproject.incubator.net.virtual.NetworkId;
|
||||
import org.onosproject.incubator.net.virtual.VirtualDevice;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetwork;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkStore;
|
||||
import org.onosproject.incubator.net.virtual.impl.provider.VirtualProviderManager;
|
||||
import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProvider;
|
||||
import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
|
||||
import org.onosproject.incubator.net.virtual.provider.VirtualPacketProvider;
|
||||
import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
|
||||
import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
|
||||
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowObjectiveStore;
|
||||
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowRuleStore;
|
||||
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualPacketStore;
|
||||
import org.onosproject.net.DeviceId;
|
||||
import org.onosproject.net.NetTestTools;
|
||||
@ -49,6 +54,8 @@ import org.onosproject.net.flow.TrafficSelector;
|
||||
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
|
||||
import org.onosproject.net.flowobjective.ForwardingObjective;
|
||||
import org.onosproject.net.flowobjective.Objective;
|
||||
import org.onosproject.net.flow.FlowRule;
|
||||
import org.onosproject.net.flow.FlowRuleBatchOperation;
|
||||
import org.onosproject.net.intent.FakeIntentManager;
|
||||
import org.onosproject.net.intent.TestableIntentService;
|
||||
import org.onosproject.net.packet.DefaultOutboundPacket;
|
||||
@ -57,6 +64,7 @@ import org.onosproject.net.packet.PacketContext;
|
||||
import org.onosproject.net.packet.PacketPriority;
|
||||
import org.onosproject.net.packet.PacketProcessor;
|
||||
import org.onosproject.net.provider.ProviderId;
|
||||
import org.onosproject.store.service.StorageService;
|
||||
import org.onosproject.store.service.TestStorageService;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
@ -95,12 +103,17 @@ public class VirtualNetworkPacketManagerTest extends VirtualNetworkTestUtil {
|
||||
|
||||
private ApplicationId appId = new TestApplicationId("VirtualPacketManagerTest");
|
||||
|
||||
private VirtualFlowRuleProvider flowRuleProvider = new TestFlowRuleProvider();
|
||||
private SimpleVirtualFlowRuleStore flowRuleStore;
|
||||
private SimpleVirtualFlowObjectiveStore flowObjectiveStore;
|
||||
|
||||
@Before
|
||||
public void setUp() throws TestUtils.TestUtilsException {
|
||||
virtualNetworkManagerStore = new DistributedVirtualNetworkStore();
|
||||
|
||||
TestUtils.setField(virtualNetworkManagerStore, "coreService", coreService);
|
||||
TestUtils.setField(virtualNetworkManagerStore, "storageService", new TestStorageService());
|
||||
StorageService storageService = new TestStorageService();
|
||||
TestUtils.setField(virtualNetworkManagerStore, "storageService", storageService);
|
||||
virtualNetworkManagerStore.activate();
|
||||
|
||||
manager = new VirtualNetworkManager();
|
||||
@ -109,8 +122,15 @@ public class VirtualNetworkPacketManagerTest extends VirtualNetworkTestUtil {
|
||||
manager.intentService = intentService;
|
||||
NetTestTools.injectEventDispatcher(manager, new TestEventDispatcher());
|
||||
|
||||
flowObjectiveStore = new SimpleVirtualFlowObjectiveStore();
|
||||
TestUtils.setField(flowObjectiveStore, "storageService", storageService);
|
||||
flowObjectiveStore.activate();
|
||||
flowRuleStore = new SimpleVirtualFlowRuleStore();
|
||||
flowRuleStore.activate();
|
||||
|
||||
providerRegistryService = new VirtualProviderManager();
|
||||
providerRegistryService.registerProvider(provider);
|
||||
providerRegistryService.registerProvider(flowRuleProvider);
|
||||
|
||||
testDirectory = new TestServiceDirectory()
|
||||
.add(VirtualNetworkStore.class, virtualNetworkManagerStore)
|
||||
@ -118,6 +138,8 @@ public class VirtualNetworkPacketManagerTest extends VirtualNetworkTestUtil {
|
||||
.add(VirtualProviderRegistryService.class, providerRegistryService)
|
||||
.add(EventDeliveryService.class, eventDeliveryService)
|
||||
.add(ClusterService.class, new ClusterServiceAdapter())
|
||||
.add(VirtualNetworkFlowRuleStore.class, flowRuleStore)
|
||||
.add(VirtualNetworkFlowObjectiveStore.class, flowObjectiveStore)
|
||||
.add(VirtualNetworkPacketStore.class, packetStore);
|
||||
TestUtils.setField(manager, "serviceDirectory", testDirectory);
|
||||
|
||||
@ -373,4 +395,27 @@ public class VirtualNetworkPacketManagerTest extends VirtualNetworkTestUtil {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private class TestFlowRuleProvider extends AbstractVirtualProvider
|
||||
implements VirtualFlowRuleProvider {
|
||||
|
||||
protected TestFlowRuleProvider() {
|
||||
super(new ProviderId("test", "org.onosproject.virtual.testprovider"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyFlowRule(NetworkId networkId, FlowRule... flowRules) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeFlowRule(NetworkId networkId, FlowRule... flowRules) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeBatch(NetworkId networkId, FlowRuleBatchOperation batch) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,153 @@
|
||||
/*
|
||||
* Copyright 2017-present Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.incubator.store.virtual.impl;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
import org.apache.felix.scr.annotations.Reference;
|
||||
import org.apache.felix.scr.annotations.ReferenceCardinality;
|
||||
import org.apache.felix.scr.annotations.Service;
|
||||
import org.onosproject.incubator.net.virtual.NetworkId;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
|
||||
import org.onosproject.net.behaviour.DefaultNextGroup;
|
||||
import org.onosproject.net.behaviour.NextGroup;
|
||||
import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
|
||||
import org.onosproject.net.flowobjective.ObjectiveEvent;
|
||||
import org.onosproject.store.service.AtomicCounter;
|
||||
import org.onosproject.store.service.StorageService;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import static org.onlab.util.Tools.groupedThreads;
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
/**
|
||||
* Single instance implementation of store to manage
|
||||
* the inventory of created next groups for virtual network.
|
||||
*/
|
||||
@Component(immediate = true)
|
||||
@Service
|
||||
public class SimpleVirtualFlowObjectiveStore
|
||||
extends AbstractVirtualStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
|
||||
implements VirtualNetworkFlowObjectiveStore {
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
private ConcurrentMap<NetworkId, ConcurrentMap<Integer, byte[]>> nextGroupsMap;
|
||||
|
||||
private AtomicCounter nextIds;
|
||||
|
||||
// event queue to separate map-listener threads from event-handler threads (tpool)
|
||||
private BlockingQueue<VirtualObjectiveEvent> eventQ;
|
||||
private ExecutorService tpool;
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||
protected StorageService storageService;
|
||||
|
||||
@Activate
|
||||
public void activate() {
|
||||
tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/virtual/flobj-notifier", "%d", log));
|
||||
eventQ = new LinkedBlockingQueue<>();
|
||||
tpool.execute(new FlowObjectiveNotifier());
|
||||
|
||||
nextGroupsMap = Maps.newConcurrentMap();
|
||||
|
||||
nextIds = storageService.getAtomicCounter("next-objective-counter");
|
||||
log.info("Started");
|
||||
}
|
||||
|
||||
private ConcurrentMap<Integer, byte[]> getNextGroups(NetworkId networkId) {
|
||||
nextGroupsMap.computeIfAbsent(networkId, n -> Maps.newConcurrentMap());
|
||||
return nextGroupsMap.get(networkId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putNextGroup(NetworkId networkId, Integer nextId, NextGroup group) {
|
||||
ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
|
||||
nextGroups.put(nextId, group.data());
|
||||
|
||||
eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.ADD, nextId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public NextGroup getNextGroup(NetworkId networkId, Integer nextId) {
|
||||
ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
|
||||
return new DefaultNextGroup(nextGroups.get(nextId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public NextGroup removeNextGroup(NetworkId networkId, Integer nextId) {
|
||||
ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
|
||||
eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.REMOVE, nextId));
|
||||
return new DefaultNextGroup(nextGroups.remove(nextId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Integer, NextGroup> getAllGroups(NetworkId networkId) {
|
||||
ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
|
||||
|
||||
Map<Integer, NextGroup> nextGroupMappings = new HashMap<>();
|
||||
for (int key : nextGroups.keySet()) {
|
||||
NextGroup nextGroup = getNextGroup(networkId, key);
|
||||
if (nextGroup != null) {
|
||||
nextGroupMappings.put(key, nextGroup);
|
||||
}
|
||||
}
|
||||
return nextGroupMappings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int allocateNextId(NetworkId networkId) {
|
||||
return (int) nextIds.incrementAndGet();
|
||||
}
|
||||
|
||||
private class FlowObjectiveNotifier implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
VirtualObjectiveEvent vEvent = eventQ.take();
|
||||
notifyDelegate(vEvent.networkId(), vEvent);
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class VirtualObjectiveEvent extends ObjectiveEvent {
|
||||
NetworkId networkId;
|
||||
|
||||
public VirtualObjectiveEvent(NetworkId networkId, Type type,
|
||||
Integer objective) {
|
||||
super(type, objective);
|
||||
this.networkId = networkId;
|
||||
}
|
||||
|
||||
NetworkId networkId() {
|
||||
return networkId;
|
||||
}
|
||||
}
|
||||
}
|
@ -151,8 +151,12 @@ public class SimpleVirtualFlowRuleStore
|
||||
|
||||
@Override
|
||||
public int getFlowRuleCount(NetworkId networkId) {
|
||||
|
||||
int sum = 0;
|
||||
|
||||
if (flowEntries.get(networkId) == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (ConcurrentMap<FlowId, List<StoredFlowEntry>> ft :
|
||||
flowEntries.get(networkId).values()) {
|
||||
for (List<StoredFlowEntry> fes : ft.values()) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user