ONOS-5450 Initial implementation of OFAgent

- Refactored OFAgent immutable
- Added OFAgentStore and OFAgentEvent
- Implemented OFAgentManager and OFSwitchManager
- Added unit tests

Change-Id: Ie39ad2db9e6bd6259a062371b3ffe116b8c8cc52
This commit is contained in:
Hyunsun Moon 2017-03-14 03:25:52 +09:00 committed by Yoonseon Han
parent e9e457137c
commit f4ba44f7fc
25 changed files with 1878 additions and 479 deletions

View File

@ -1,19 +1,30 @@
COMPILE_DEPS = [ COMPILE_DEPS = [
'//lib:CORE_DEPS', '//lib:CORE_DEPS',
'//core/store/serializers:onos-core-serializers',
'//core/common:onos-core-common',
'//incubator/api:onos-incubator-api',
'//cli:onos-cli',
'//lib:org.apache.karaf.shell.console',
'//lib:netty-transport', '//lib:netty-transport',
'//lib:netty-buffer', '//lib:netty-buffer',
'//lib:netty-codec', '//lib:netty-codec',
'//lib:netty-handler', '//lib:netty-handler',
'//incubator/api:onos-incubator-api',
'//lib:openflowj-3.0', '//lib:openflowj-3.0',
] ]
TEST_DEPS = [
'//lib:TEST_ADAPTERS',
'//core/api:onos-api-tests',
'//core/common:onos-core-common-tests',
]
EXCLUDED_BUNDLES = [ EXCLUDED_BUNDLES = [
'//lib:openflowj-3.0', '//lib:openflowj-3.0',
] ]
osgi_jar_with_tests ( osgi_jar_with_tests (
deps = COMPILE_DEPS, deps = COMPILE_DEPS,
test_deps = TEST_DEPS,
) )
onos_app ( onos_app (

View File

@ -51,6 +51,71 @@
<groupId>org.osgi</groupId> <groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId> <artifactId>org.osgi.compendium</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.karaf.shell</groupId>
<artifactId>org.apache.karaf.shell.console</artifactId>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-misc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-incubator-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-common</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.projectfloodlight</groupId> <groupId>org.projectfloodlight</groupId>
<artifactId>openflowj</artifactId> <artifactId>openflowj</artifactId>
@ -58,27 +123,8 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId> <artifactId>netty-all</artifactId>
</dependency> <version>${netty4.version}</version>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-of-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-incubator-api</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -15,19 +15,29 @@
*/ */
package org.onosproject.ofagent.api; package org.onosproject.ofagent.api;
import io.netty.channel.nio.NioEventLoopGroup;
import org.onosproject.incubator.net.virtual.NetworkId; import org.onosproject.incubator.net.virtual.NetworkId;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
/** /**
* Representation of an OF agent, which brokers virtual devices and external * Representation of an OpenFlow agent, which holds the mapping between the virtual
* controllers by handling OpenFlow connections and messages between them. * network and the external OpenFlow controllers.
*/ */
public interface OFAgent { public interface OFAgent {
enum State {
/**
* Specifies that the ofagent state is started.
*/
STARTED,
/**
* Specifies that the ofagent state is stopped.
*/
STOPPED
}
/** /**
* Returns the identifier of the virtual network that this agent cares for. * Returns the identifier of the virtual network that this agent cares for.
* *
@ -43,14 +53,11 @@ public interface OFAgent {
Set<OFController> controllers(); Set<OFController> controllers();
/** /**
* Starts the OpenFlow agent. * Returns the admin state of the agent.
*
* @return state
*/ */
void start(); State state();
/**
* Stops the OpenFlow agent.
*/
void stop();
/** /**
* Builder of OF agent entities. * Builder of OF agent entities.
@ -72,15 +79,6 @@ public interface OFAgent {
*/ */
Builder networkId(NetworkId networkId); Builder networkId(NetworkId networkId);
/**
* Returns OF agent builder with the supplied network services for the
* virtual network.
*
* @param services network services for the virtual network
* @return of agent builder
*/
Builder services(Map<Class<?>, Object> services);
/** /**
* Returns OF agent builder with the supplied controllers. * Returns OF agent builder with the supplied controllers.
* *
@ -90,19 +88,11 @@ public interface OFAgent {
Builder controllers(Set<OFController> controllers); Builder controllers(Set<OFController> controllers);
/** /**
* Returns OF agent builder with the supplied event executor. * Returns OF agent builder with the supplied state.
* *
* @param eventExecutor event executor * @param state state of the agent
* @return of agent builder * @return of agent builder
*/ */
Builder eventExecutor(ExecutorService eventExecutor); Builder state(State state);
/**
* Returns OF agent builder with the supplied IO work group.
*
* @param ioWorker io worker group
* @return of agent builder
*/
Builder ioWorker(NioEventLoopGroup ioWorker);
} }
} }

View File

@ -0,0 +1,59 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.api;
import org.onosproject.incubator.net.virtual.NetworkId;
/**
* Service for administering the inventory of OpenFlow agents.
*/
public interface OFAgentAdminService {
/**
* Creates an OpenFlow agent for a given virtual network with given controllers.
*
* @param ofAgent the new ofagent
*/
void createAgent(OFAgent ofAgent);
/**
* Updates the agent.
*
* @param ofAgent updated ofagent
*/
void updateAgent(OFAgent ofAgent);
/**
* Removes the OpenFlow agent for the given virtual network.
*
* @param networkId virtual network identifier
*/
void removeAgent(NetworkId networkId);
/**
* Starts the agent for the given network.
*
* @param networkId virtual network identifier
*/
void startAgent(NetworkId networkId);
/**
* Stops the agent for the given network.
*
* @param networkId virtual network identifier
*/
void stopAgent(NetworkId networkId);
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.api;
import org.onosproject.event.AbstractEvent;
/**
* Describes OFAgent event.
*/
public class OFAgentEvent extends AbstractEvent<OFAgentEvent.Type, OFAgent> {
private final OFController controller;
public enum Type {
/**
* Signifies that a new OFAgent is created.
*/
OFAGENT_CREATED,
/**
* Signifies that the OFAgent is removed.
*/
OFAGENT_REMOVED,
/**
* Signifies that the new external controller is added.
*/
OFAGENT_CONTROLLER_ADDED,
/**
* Signifies that the external controller is removed.
*/
OFAGENT_CONTROLLER_REMOVED,
/**
* Signifies that the OFAgent is started.
*/
OFAGENT_STARTED,
/**
* Signifies that the OFAgent is stopped.
*/
OFAGENT_STOPPED,
}
/**
* Creates an event of a given type for the specified ofagent and the current time.
*
* @param type ofagent event type
* @param ofAgent ofagent instance
*/
public OFAgentEvent(OFAgentEvent.Type type, OFAgent ofAgent) {
super(type, ofAgent);
this.controller = null;
}
/**
* Creates an event of a given type for the specified ofagent and the updated controller.
*
* @param type ofagent event type
* @param ofAgent ofagent instance
* @param controller updated external controller
*/
public OFAgentEvent(OFAgentEvent.Type type, OFAgent ofAgent, OFController controller) {
super(type, ofAgent);
this.controller = controller;
}
/**
* Returns the updated controller.
*
* @return updated controller; null if the event is not controller related
*/
public OFController controller() {
return this.controller;
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.api;
import org.onosproject.event.EventListener;
/**
* Listener for OFAgent events.
*/
public interface OFAgentListener extends EventListener<OFAgentEvent> {
}

View File

@ -15,6 +15,7 @@
*/ */
package org.onosproject.ofagent.api; package org.onosproject.ofagent.api;
import org.onosproject.event.ListenerService;
import org.onosproject.incubator.net.virtual.NetworkId; import org.onosproject.incubator.net.virtual.NetworkId;
import java.util.Set; import java.util.Set;
@ -22,7 +23,9 @@ import java.util.Set;
/** /**
* Service for administering OF agents for a virtual network. * Service for administering OF agents for a virtual network.
*/ */
public interface OFAgentService { public interface OFAgentService extends ListenerService<OFAgentEvent, OFAgentListener> {
String APPLICATION_NAME = "org.onosproject.ofagent";
/** /**
* Returns the OpenFlow agent list. * Returns the OpenFlow agent list.
@ -32,39 +35,10 @@ public interface OFAgentService {
Set<OFAgent> agents(); Set<OFAgent> agents();
/** /**
* Creates an OpenFlow agent for a given virtual network with given controllers. * Returns the agent for the given network.
*
* @param networkId id of the virtual network
* @param controllers list of controllers
*/
void createAgent(NetworkId networkId, OFController... controllers);
/**
* Removes the OpenFlow agent for the given virtual network.
*
* @param networkId virtual network identifier
*/
void removeAgent(NetworkId networkId);
/**
* Starts the agent for the given network.
*
* @param networkId virtual network identifier
*/
void startAgent(NetworkId networkId);
/**
* Stops the agent for the given network.
*
* @param networkId virtual network identifier
*/
void stopAgent(NetworkId networkId);
/**
* Returns if the agent of the given network is active or not.
* *
* @param networkId network id * @param networkId network id
* @return true if the agent is active * @return ofagent; null if no ofagent exists for the network
*/ */
boolean isActive(NetworkId networkId); OFAgent agent(NetworkId networkId);
} }

View File

@ -0,0 +1,64 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.api;
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.store.Store;
import java.util.Set;
/**
* Manages inventory of OpenFlow agent states; not intended for direct use.
*/
public interface OFAgentStore extends Store<OFAgentEvent, OFAgentStoreDelegate> {
/**
* Creates the new openflow agent.
*
* @param ofAgent the new ofagent
*/
void createOfAgent(OFAgent ofAgent);
/**
* Updates the openflow agent.
*
* @param ofAgent the updated ofagent
*/
void updateOfAgent(OFAgent ofAgent);
/**
* Removes the openflow agent for the supplied network ID.
*
* @param networkId virtual network identifier
* @return removed agent; null if remove failed
*/
OFAgent removeOfAgent(NetworkId networkId);
/**
* Returns the openflow agent with the supplied network ID.
*
* @param networkId virtual network identifier
* @return ofagent; null if no ofagent exists for the network
*/
OFAgent ofAgent(NetworkId networkId);
/**
* Returns all openflow agents.
*
* @return set of ofagents; empty set if no ofagents exist
*/
Set<OFAgent> ofAgents();
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.api;
import org.onosproject.store.StoreDelegate;
/**
* OFAgent network store delegate abstraction.
*/
public interface OFAgentStoreDelegate extends StoreDelegate<OFAgentEvent> {
}

View File

@ -15,19 +15,19 @@
*/ */
package org.onosproject.ofagent.api; package org.onosproject.ofagent.api;
import org.onosproject.net.Device; import org.projectfloodlight.openflow.types.DatapathId;
/** /**
* Representation of virtual OpenFlow switch. * Representation of virtual OpenFlow switch.
*/ */
public interface OFSwitch extends OFSwitchService, OFControllerRoleService { public interface OFSwitch extends OFSwitchOperationService, OFControllerRoleService {
/** /**
* Returns the device information. * Returns the device information.
* *
* @return virtual device * @return virtual device
*/ */
Device device(); DatapathId dpid();
/** /**
* Returns the capabilities of the switch. * Returns the capabilities of the switch.
@ -35,11 +35,4 @@ public interface OFSwitch extends OFSwitchService, OFControllerRoleService {
* @return capabilities * @return capabilities
*/ */
OFSwitchCapabilities capabilities(); OFSwitchCapabilities capabilities();
/**
* Returns if the switch is connected to controllers or not.
*
* @return true if the switch is connected, false otherwise
*/
boolean isConnected();
} }

View File

@ -0,0 +1,129 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.api;
import io.netty.channel.Channel;
import org.onosproject.net.Port;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.packet.InboundPacket;
import org.projectfloodlight.openflow.protocol.OFMessage;
/**
* Service for providing OpenFlow operations.
*/
public interface OFSwitchOperationService {
/**
* Processes a new port of the switch.
* It sends out FEATURE_REPLY message to the controllers.
*
* @param port virtual port
*/
void processPortAdded(Port port);
/**
* Processes port link down.
* It sends out PORT_STATUS asynchronous message to the controllers.
*
* @param port virtual port
*/
void processPortDown(Port port);
/**
* Processes port link down.
* It sends out PORT_STATUS asynchronous message to the controllers.
*
* @param port virtual port
*/
void processPortUp(Port port);
/**
* Processes flow removed.
* It sends out FLOW_REMOVED asynchronous message to the controllers.
*
* @param flowRule removed flow rule
*/
void processFlowRemoved(FlowRule flowRule);
/**
* Processes packet in.
* It sends out PACKET_IN asynchronous message to the controllers.
*
* @param packet inbound packet
*/
void processPacketIn(InboundPacket packet);
/**
* Processes commands from the controllers that modify the state of the switch.
* Possible message types include PACKET_OUT, FLOW_MOD, GROUP_MOD,
* PORT_MOD, TABLE_MOD. These types of messages can be denied based on a
* role of the request controller.
*
* @param channel received channel
* @param msg command message received
*/
void processControllerCommand(Channel channel, OFMessage msg);
/**
* Processes a stats request from the controllers.
* Targeted message type is MULTIPART_REQUEST with FLOW, PORT, GROUP,
* GROUP_DESC subtypes.
*
* @param channel received channel
* @param msg stats request message received
*/
void processStatsRequest(Channel channel, OFMessage msg);
/**
* Processes a role request from the controllers.
* Targeted message type is ROLE_REQUEST.
*
* @param channel received channel
* @param msg role request message received
*/
void processRoleRequest(Channel channel, OFMessage msg);
/**
* Processes a features request from the controllers.
*
* @param channel received channel
* @param msg received features request
*/
void processFeaturesRequest(Channel channel, OFMessage msg);
/**
* Processes LLDP packets from the controller.
*
* @param channel received channel
* @param msg packet out message with lldp
*/
void processLldp(Channel channel, OFMessage msg);
/**
* Sends hello to the controller.
*
* @param channel received channel
*/
void sendOfHello(Channel channel);
/**
* Processes echo request from the controllers.
*
* @param channel received channel
* @param msg echo request message
*/
void processEchoRequest(Channel channel, OFMessage msg);
}

View File

@ -15,125 +15,27 @@
*/ */
package org.onosproject.ofagent.api; package org.onosproject.ofagent.api;
import io.netty.channel.Channel; import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.net.Port;
import org.onosproject.net.flow.FlowRule; import java.util.Set;
import org.onosproject.net.packet.InboundPacket;
import org.projectfloodlight.openflow.protocol.OFMessage;
/** /**
* Service providing OpenFlow switch operations. * Service for providing virtual OpenFlow switch information.
*/ */
public interface OFSwitchService { public interface OFSwitchService {
/** /**
* Handles the switch starts. * Returns all openflow switches that OF agent service manages.
*/
void started();
/**
* Handles the switch stops.
*/
void stopped();
/**
* Processes a new port of the switch.
* It sends out FEATURE_REPLY message to the controllers.
* *
* @param port virtual port * @return set of openflow switches; empty set if no openflow switches exist
*/ */
void processPortAdded(Port port); Set<OFSwitch> ofSwitches();
/** /**
* Processes port link down. * Returns all openflow switches for the specified network.
* It sends out PORT_STATUS asynchronous message to the controllers.
* *
* @param port virtual port * @param networkId network id
* @return set of openflow switches; empty set if no devices exist on the network
*/ */
void processPortDown(Port port); Set<OFSwitch> ofSwitches(NetworkId networkId);
/**
* Processes port link down.
* It sends out PORT_STATUS asynchronous message to the controllers.
*
* @param port virtual port
*/
void processPortUp(Port port);
/**
* Processes flow removed.
* It sends out FLOW_REMOVED asynchronous message to the controllers.
*
* @param flowRule removed flow rule
*/
void processFlowRemoved(FlowRule flowRule);
/**
* Processes packet in.
* It sends out PACKET_IN asynchronous message to the controllers.
*
* @param packet inbound packet
*/
void processPacketIn(InboundPacket packet);
/**
* Processes commands from the controllers that modify the state of the switch.
* Possible message types include PACKET_OUT, FLOW_MOD, GROUP_MOD,
* PORT_MOD, TABLE_MOD. These types of messages can be denied based on a
* role of the request controller.
*
* @param channel received channel
* @param msg command message received
*/
void processControllerCommand(Channel channel, OFMessage msg);
/**
* Processes a stats request from the controllers.
* Targeted message type is MULTIPART_REQUEST with FLOW, PORT, GROUP,
* GROUP_DESC subtypes.
*
* @param channel received channel
* @param msg stats request message received
*/
void processStatsRequest(Channel channel, OFMessage msg);
/**
* Processes a role request from the controllers.
* Targeted message type is ROLE_REQUEST.
*
* @param channel received channel
* @param msg role request message received
*/
void processRoleRequest(Channel channel, OFMessage msg);
/**
* Processes a features request from the controllers.
*
* @param channel received channel
* @param msg received features request
*/
void processFeaturesRequest(Channel channel, OFMessage msg);
/**
* Processes LLDP packets from the controller.
*
* @param channel received channel
* @param msg packet out message with lldp
*/
void processLldp(Channel channel, OFMessage msg);
/**
* Sends hello to the controller.
*
* @param channel received channel
*/
void sendOfHello(Channel channel);
/**
* Processes echo request from the controllers.
*
* @param channel received channel
* @param msg echo request message
*/
void processEchoRequest(Channel channel, OFMessage msg);
} }

View File

@ -15,111 +15,134 @@
*/ */
package org.onosproject.ofagent.impl; package org.onosproject.ofagent.impl;
import io.netty.channel.nio.NioEventLoopGroup; import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import org.onosproject.incubator.net.virtual.NetworkId; import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleListener;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.ofagent.api.OFAgent; import org.onosproject.ofagent.api.OFAgent;
import org.onosproject.ofagent.api.OFController; import org.onosproject.ofagent.api.OFController;
import org.onosproject.ofagent.api.OFSwitch;
import java.util.Map; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import static com.google.common.base.Preconditions.checkNotNull;
/** /**
* Implementation of OF agent. * Implementation of OpenFlow agent.
*/ */
public final class DefaultOFAgent implements OFAgent { public final class DefaultOFAgent implements OFAgent {
private final NetworkId networkId; private final NetworkId networkId;
private final Map<Class<?>, Object> services;
private final Set<OFController> controllers; private final Set<OFController> controllers;
private final ExecutorService eventExecutor; private final State state;
private final NioEventLoopGroup ioWorker;
private final ConcurrentHashMap<DeviceId, OFSwitch> switchMap = new ConcurrentHashMap<>();
private final DeviceListener deviceListener = new InternalDeviceListener();
private final FlowRuleListener flowRuleListener = new InternalFlowRuleListener();
private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
private DefaultOFAgent(NetworkId networkId, private DefaultOFAgent(NetworkId networkId,
Map<Class<?>, Object> services,
Set<OFController> controllers, Set<OFController> controllers,
ExecutorService eventExecutor, State state) {
NioEventLoopGroup ioWorker) {
this.networkId = networkId; this.networkId = networkId;
this.services = services;
this.controllers = controllers; this.controllers = controllers;
this.eventExecutor = eventExecutor; this.state = state;
this.ioWorker = ioWorker;
} }
@Override @Override
public NetworkId networkId() { public NetworkId networkId() {
return null; return networkId;
} }
@Override @Override
public Set<OFController> controllers() { public Set<OFController> controllers() {
return null; return controllers;
} }
@Override @Override
public void start() { public State state() {
// TODO add listeners to the services return state;
// TODO connect all virtual devices in this network to the controllers
} }
@Override @Override
public void stop() { public int hashCode() {
// TODO remove listeners from the services return Objects.hash(networkId);
// TODO disconnect all active connections
} }
private void connect(OFSwitch ofSwitch, OFController controller) { @Override
// TODO connect the switch to the controller public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof DefaultOFAgent) {
DefaultOFAgent that = (DefaultOFAgent) obj;
if (Objects.equals(networkId, that.networkId)) {
return true;
}
}
return false;
} }
private void disconnect(OFSwitch ofSwitch, OFController controller) { @Override
// TODO disconnect the controller from the ofSwitch public String toString() {
return MoreObjects.toStringHelper(this)
.add("networkId", this.networkId)
.add("controllers", this.controllers)
.add("state", this.state)
.toString();
} }
private class InternalFlowRuleListener implements FlowRuleListener { /**
* Returns new builder instance.
*
* @return default ofagent builder
*/
public static Builder builder() {
return new Builder();
}
/**
* Returns new builder instance from the existing agent.
*
* @param ofAgent the existing agent
* @return default ofagent builder
*/
public static Builder builder(OFAgent ofAgent) {
return new Builder()
.networkId(ofAgent.networkId())
.controllers(ofAgent.controllers())
.state(ofAgent.state());
}
public static final class Builder implements OFAgent.Builder {
private NetworkId networkId;
private Set<OFController> controllers;
private State state;
private Builder() {
}
@Override @Override
public void event(FlowRuleEvent event) { public OFAgent build() {
// TODO handle flow rule event checkNotNull(networkId, "Network ID cannot be null");
} checkNotNull(state, "State cannot be null");
} controllers = controllers == null ? ImmutableSet.of() : controllers;
private class InternalDeviceListener implements DeviceListener { return new DefaultOFAgent(networkId, controllers, state);
}
@Override @Override
public void event(DeviceEvent event) { public Builder networkId(NetworkId networkId) {
// TODO handle device event this.networkId = networkId;
// device detected: connect the device to controllers return this;
// device removed: disconnect and remove the switch from the map
// device state available: connect the switch to the controllers
// device state unavailable: disconnect the switch from the controllers
// port added: send out features reply
// port status change
} }
}
private class InternalPacketProcessor implements PacketProcessor {
@Override @Override
public void process(PacketContext context) { public Builder controllers(Set<OFController> controllers) {
// TODO handle packet-in this.controllers = controllers;
return this;
}
@Override
public Builder state(State state) {
this.state = state;
return this;
} }
} }
// TODO implement builder
} }

View File

@ -15,22 +15,41 @@
*/ */
package org.onosproject.ofagent.impl; package org.onosproject.ofagent.impl;
import com.google.common.base.MoreObjects;
import org.onlab.packet.IpAddress; import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort; import org.onlab.packet.TpPort;
import org.onosproject.ofagent.api.OFController; import org.onosproject.ofagent.api.OFController;
/** import java.util.Objects;
* Implementation of tenant openflow controller.
*/
public class DefaultOFController implements OFController {
private IpAddress ip;
private TpPort port;
public DefaultOFController(IpAddress ip, TpPort port) { import static com.google.common.base.Preconditions.checkNotNull;
/**
* Implementation of the default OpenFlow controller.
*/
public final class DefaultOFController implements OFController {
private final IpAddress ip;
private final TpPort port;
private DefaultOFController(IpAddress ip, TpPort port) {
this.ip = ip; this.ip = ip;
this.port = port; this.port = port;
} }
/**
* Returns new OpenFlow controller with the supplied IP and port.
*
* @param ip ip address
* @param port port number
* @return openflow controller
*/
public static DefaultOFController of(IpAddress ip, TpPort port) {
checkNotNull(ip, "Controller IP address cannot be null");
checkNotNull(port, "Controller port address cannot be null");
return new DefaultOFController(ip, port);
}
@Override @Override
public IpAddress ip() { public IpAddress ip() {
return ip; return ip;
@ -40,4 +59,33 @@ public class DefaultOFController implements OFController {
public TpPort port() { public TpPort port() {
return port; return port;
} }
@Override
public int hashCode() {
return Objects.hash(ip, port);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof DefaultOFController) {
DefaultOFController that = (DefaultOFController) obj;
if (Objects.equals(ip, that.ip) &&
Objects.equals(port, that.port)) {
return true;
}
}
return false;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("ip", this.ip)
.add("port", this.port)
.toString();
}
} }

View File

@ -15,10 +15,8 @@
*/ */
package org.onosproject.ofagent.impl; package org.onosproject.ofagent.impl;
import com.google.common.collect.Lists; import com.google.common.collect.ImmutableSet;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port; import org.onosproject.net.Port;
import org.onosproject.net.flow.FlowRule; import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.packet.InboundPacket; import org.onosproject.net.packet.InboundPacket;
@ -35,7 +33,7 @@ import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFVersion; import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.types.DatapathId; import org.projectfloodlight.openflow.types.DatapathId;
import java.util.List; import java.util.Collections;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -44,7 +42,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.projectfloodlight.openflow.protocol.OFControllerRole.*; import static org.projectfloodlight.openflow.protocol.OFControllerRole.*;
/** /**
* Implementation of OF switch. * Implementation of the default OpenFlow switch.
*/ */
public final class DefaultOFSwitch implements OFSwitch { public final class DefaultOFSwitch implements OFSwitch {
@ -53,49 +51,34 @@ public final class DefaultOFSwitch implements OFSwitch {
private static final long NUM_BUFFERS = 1024; private static final long NUM_BUFFERS = 1024;
private static final short NUM_TABLES = 3; private static final short NUM_TABLES = 3;
private final Device device; private final DatapathId dpId;
private final OFSwitchCapabilities capabilities; private final OFSwitchCapabilities capabilities;
private final DatapathId datapathId;
private final ConcurrentHashMap<Channel, OFControllerRole> controllerRoleMap private final ConcurrentHashMap<Channel, OFControllerRole> controllerRoleMap
= new ConcurrentHashMap<>(); = new ConcurrentHashMap<>();
private static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13); private int handshakeTransactionIds = -1;
private int handshakeTransactionIds;
public DefaultOFSwitch(Device device, OFSwitchCapabilities capabilities) { private DefaultOFSwitch(DatapathId dpid, OFSwitchCapabilities capabilities) {
this.device = device; this.dpId = dpid;
this.capabilities = capabilities; this.capabilities = capabilities;
datapathId = getDpidFromDeviceId(device.id());
handshakeTransactionIds = -1;
} }
// TODO add builder public static DefaultOFSwitch of(DatapathId dpid, OFSwitchCapabilities capabilities) {
checkNotNull(dpid, "DPID cannot be null");
checkNotNull(capabilities, "OF capabilities cannot be null");
return new DefaultOFSwitch(dpid, capabilities);
}
@Override @Override
public Device device() { public DatapathId dpid() {
return device; return this.dpId;
} }
@Override @Override
public OFSwitchCapabilities capabilities() { public OFSwitchCapabilities capabilities() {
return capabilities; return this.capabilities;
}
@Override
public boolean isConnected() {
return !controllerChannels().isEmpty();
}
@Override
public void started() {
// TODO do some initial setups
}
@Override
public void stopped() {
// TODO implement
} }
@Override @Override
@ -136,7 +119,7 @@ public final class DefaultOFSwitch implements OFSwitch {
@Override @Override
public Set<Channel> controllerChannels() { public Set<Channel> controllerChannels() {
return null; return ImmutableSet.copyOf(controllerRoleMap.keySet());
} }
@Override @Override
@ -181,19 +164,14 @@ public final class DefaultOFSwitch implements OFSwitch {
@Override @Override
public void processFeaturesRequest(Channel channel, OFMessage msg) { public void processFeaturesRequest(Channel channel, OFMessage msg) {
// TODO process features request and send reply OFFeaturesReply ofFeaturesReply = FACTORY.buildFeaturesReply()
List<OFMessage> ofMessageList = Lists.newArrayList(); .setDatapathId(dpId)
OFFeaturesReply.Builder frBuilder = FACTORY.buildFeaturesReply()
.setDatapathId(datapathId)
.setNBuffers(NUM_BUFFERS) .setNBuffers(NUM_BUFFERS)
.setNTables(NUM_TABLES) .setNTables(NUM_TABLES)
.setCapabilities(capabilities.ofSwitchCapabilities()) .setCapabilities(capabilities.ofSwitchCapabilities())
.setXid(msg.getXid()); .setXid(msg.getXid())
.build();
ofMessageList.add(frBuilder.build()); channel.writeAndFlush(Collections.singletonList(ofFeaturesReply));
channel.write(ofMessageList);
} }
@Override @Override
@ -203,38 +181,18 @@ public final class DefaultOFSwitch implements OFSwitch {
@Override @Override
public void sendOfHello(Channel channel) { public void sendOfHello(Channel channel) {
List<OFMessage> ofMessageList = Lists.newArrayList(); OFHello ofHello = FACTORY.buildHello()
OFHello.Builder ofHello = FACTORY.buildHello() .setXid(this.handshakeTransactionIds--)
.setXid(this.handshakeTransactionIds--); .build();
channel.writeAndFlush(Collections.singletonList(ofHello));
ofMessageList.add(ofHello.build());
channel.write(ofMessageList);
} }
@Override @Override
public void processEchoRequest(Channel channel, OFMessage msg) { public void processEchoRequest(Channel channel, OFMessage msg) {
List<OFMessage> ofMessageList = Lists.newArrayList(); OFEchoReply ofEchoReply = FACTORY.buildEchoReply()
OFEchoReply.Builder echoBuilder = FACTORY.buildEchoReply()
.setXid(msg.getXid()) .setXid(msg.getXid())
.setData(((OFEchoRequest) msg).getData()); .setData(((OFEchoRequest) msg).getData())
.build();
ofMessageList.add(echoBuilder.build()); channel.writeAndFlush(Collections.singletonList(ofEchoReply));
channel.write(ofMessageList);
}
private DatapathId getDpidFromDeviceId(DeviceId deviceId) {
String deviceIdToString = deviceId.toString().split(":")[1];
assert (deviceIdToString.length() == 16);
String resultedHexString = new String();
for (int i = 0; i < 8; i++) {
resultedHexString = resultedHexString + deviceIdToString.charAt(2 * i)
+ deviceIdToString.charAt(2 * i + 1);
if (i != 7) {
resultedHexString += ":";
}
}
return DatapathId.of(resultedHexString);
} }
} }

View File

@ -0,0 +1,208 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.impl;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.ofagent.api.OFAgent;
import org.onosproject.ofagent.api.OFAgentEvent;
import org.onosproject.ofagent.api.OFAgentEvent.Type;
import org.onosproject.ofagent.api.OFAgentStore;
import org.onosproject.ofagent.api.OFAgentStoreDelegate;
import org.onosproject.ofagent.api.OFController;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
import static org.onosproject.ofagent.api.OFAgentEvent.Type.*;
import static org.onosproject.ofagent.api.OFAgentService.APPLICATION_NAME;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of the {@link OFAgentStore} with consistent map.
*/
@Service
@Component(immediate = true)
public class DistributedOFAgentStore extends AbstractStore<OFAgentEvent, OFAgentStoreDelegate>
implements OFAgentStore {
private final Logger log = getLogger(getClass());
private static final String ERR_NOT_FOUND = " does not exist";
private static final String ERR_DUPLICATE = " already exists";
private static final KryoNamespace SERIALIZER_OFAGENT = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(OFAgent.class)
.register(OFAgent.State.class)
.register(NetworkId.class)
.register(DefaultOFAgent.class)
.register(OFController.class)
.register(DefaultOFController.class)
.build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final MapEventListener<NetworkId, OFAgent> ofAgentMapListener = new OFAgentMapListener();
private ConsistentMap<NetworkId, OFAgent> ofAgentStore;
@Activate
protected void activate() {
ApplicationId appId = coreService.registerApplication(APPLICATION_NAME);
ofAgentStore = storageService.<NetworkId, OFAgent>consistentMapBuilder()
.withSerializer(Serializer.using(SERIALIZER_OFAGENT))
.withName("ofagentstore")
.withApplicationId(appId)
.build();
ofAgentStore.addListener(ofAgentMapListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
ofAgentStore.removeListener(ofAgentMapListener);
eventExecutor.shutdown();
log.info("Stopped");
}
@Override
public void createOfAgent(OFAgent ofAgent) {
ofAgentStore.compute(ofAgent.networkId(), (id, existing) -> {
final String error = ofAgent.networkId() + ERR_DUPLICATE;
checkArgument(existing == null, error);
return ofAgent;
});
}
@Override
public void updateOfAgent(OFAgent ofAgent) {
ofAgentStore.compute(ofAgent.networkId(), (id, existing) -> {
final String error = ofAgent.networkId() + ERR_NOT_FOUND;
checkArgument(existing != null, error);
return ofAgent;
});
}
@Override
public OFAgent removeOfAgent(NetworkId networkId) {
Versioned<OFAgent> ofAgent = ofAgentStore.remove(networkId);
return ofAgent == null ? null : ofAgent.value();
}
@Override
public OFAgent ofAgent(NetworkId networkId) {
Versioned<OFAgent> ofAgent = ofAgentStore.get(networkId);
return ofAgent == null ? null : ofAgent.value();
}
@Override
public Set<OFAgent> ofAgents() {
Set<OFAgent> ofAgents = ofAgentStore.values().stream()
.map(Versioned::value)
.collect(Collectors.toSet());
return ImmutableSet.copyOf(ofAgents);
}
private class OFAgentMapListener implements MapEventListener<NetworkId, OFAgent> {
@Override
public void event(MapEvent<NetworkId, OFAgent> event) {
switch (event.type()) {
case INSERT:
eventExecutor.execute(() -> {
log.debug("OFAgent for network {} created", event.key());
notifyDelegate(new OFAgentEvent(
Type.OFAGENT_CREATED,
event.newValue().value()));
});
break;
case UPDATE:
eventExecutor.execute(() -> {
log.debug("OFAgent for network {} updated", event.key());
processUpdated(event.oldValue().value(), event.newValue().value());
});
break;
case REMOVE:
eventExecutor.execute(() -> {
log.debug("OFAgent for network {} removed", event.key());
notifyDelegate(new OFAgentEvent(
Type.OFAGENT_REMOVED,
event.oldValue().value()));
});
break;
default:
break;
}
}
private void processUpdated(OFAgent oldValue, OFAgent newValue) {
if (!oldValue.controllers().equals(newValue.controllers())) {
oldValue.controllers().stream()
.filter(controller -> !newValue.controllers().contains(controller))
.forEach(controller -> notifyDelegate(new OFAgentEvent(
OFAGENT_CONTROLLER_REMOVED,
newValue,
controller)
));
newValue.controllers().stream()
.filter(controller -> !oldValue.controllers().contains(controller))
.forEach(controller -> notifyDelegate(new OFAgentEvent(
OFAGENT_CONTROLLER_ADDED,
newValue,
controller
)));
}
if (oldValue.state() != newValue.state()) {
Type eventType = newValue.state() == STARTED ? OFAGENT_STARTED : OFAGENT_STOPPED;
notifyDelegate(new OFAgentEvent(eventType, newValue));
}
}
}
}

View File

@ -15,99 +15,258 @@
*/ */
package org.onosproject.ofagent.impl; package org.onosproject.ofagent.impl;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service; import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.incubator.net.virtual.NetworkId; import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkEvent; import org.onosproject.incubator.net.virtual.VirtualNetworkEvent;
import org.onosproject.incubator.net.virtual.VirtualNetworkListener; import org.onosproject.incubator.net.virtual.VirtualNetworkListener;
import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.ofagent.api.OFAgent; import org.onosproject.ofagent.api.OFAgent;
import org.onosproject.ofagent.api.OFAgentAdminService;
import org.onosproject.ofagent.api.OFAgentEvent;
import org.onosproject.ofagent.api.OFAgentListener;
import org.onosproject.ofagent.api.OFAgentService; import org.onosproject.ofagent.api.OFAgentService;
import org.onosproject.ofagent.api.OFController; import org.onosproject.ofagent.api.OFAgentStore;
import org.onosproject.ofagent.api.OFAgentStoreDelegate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import static org.onlab.util.BoundedThreadPool.newFixedThreadPool; import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
import static org.onosproject.ofagent.api.OFAgent.State.STOPPED;
/** /**
* Implementation of OpenFlow agent service. * Implementation of OpenFlow agent service.
*/ */
@Component(immediate = true) @Component(immediate = true)
@Service @Service
public class OFAgentManager implements OFAgentService { public class OFAgentManager extends ListenerRegistry<OFAgentEvent, OFAgentListener>
implements OFAgentService, OFAgentAdminService {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
// TODO make it to be configurable with component config private static final String MSG_OFAGENT = "OFAgent for network %s %s";
private static final int NUM_OF_THREADS = 1; private static final String MSG_CREATED = "created";
private final ExecutorService eventExecutor = newFixedThreadPool( private static final String MSG_UPDATED = "updated";
NUM_OF_THREADS, private static final String MSG_REMOVED = "removed";
groupedThreads(this.getClass().getSimpleName(), "event-handler", log)); private static final String MSG_IN_STARTED = "is already in active state, do nothing";
private static final String MSG_IN_STOPPED = "is already in inactive state, do nothing";
// TODO change it to ConsistentMap and support multi-instance mode private static final String ERR_NULL_OFAGENT = "OFAgent cannot be null";
private ConcurrentHashMap<NetworkId, OFAgent> agentMap = new ConcurrentHashMap<>(); private static final String ERR_NULL_NETID = "Network ID cannot be null";
private NioEventLoopGroup ioWorker; private static final String ERR_NOT_EXIST = "does not exist";
private static final String ERR_IN_USE = "is in start state, stop the agent first";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VirtualNetworkService virtualNetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OFAgentStore ofAgentStore;
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final LeadershipEventListener leadershipListener = new InternalLeadershipListener();
private final VirtualNetworkListener virtualNetListener = new InternalVirtualNetworkListener();
private final OFAgentStoreDelegate delegate = new InternalOFAgentStoreDelegate();
private ApplicationId appId;
private NodeId localId;
@Activate @Activate
protected void activate() { protected void activate() {
// TODO listen to the virtual network event appId = coreService.registerApplication(APPLICATION_NAME);
ioWorker = new NioEventLoopGroup(); localId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
ofAgentStore.setDelegate(delegate);
virtualNetService.addListener(virtualNetListener);
leadershipService.addListener(leadershipListener);
log.info("Started"); log.info("Started");
} }
@Deactivate @Deactivate
protected void deactivate() { protected void deactivate() {
ioWorker.shutdownGracefully(); leadershipService.removeListener(leadershipListener);
virtualNetService.removeListener(virtualNetListener);
ofAgentStore.unsetDelegate(delegate);
ofAgentStore.ofAgents().forEach(ofAgent -> stopAgent(ofAgent.networkId()));
eventExecutor.shutdown(); eventExecutor.shutdown();
leadershipService.withdraw(appId.name());
log.info("Stopped"); log.info("Stopped");
} }
@Override @Override
public Set<OFAgent> agents() { public void createAgent(OFAgent ofAgent) {
// TODO return existing agents checkNotNull(ofAgent, ERR_NULL_OFAGENT);
return null; if (ofAgent.state() == STARTED) {
log.warn(String.format(MSG_OFAGENT, ofAgent.networkId(), ERR_IN_USE));
return;
}
ofAgentStore.createOfAgent(ofAgent);
log.info(String.format(MSG_OFAGENT, ofAgent.networkId(), MSG_CREATED));
} }
@Override @Override
public void createAgent(NetworkId networkId, OFController... controllers) { public void updateAgent(OFAgent ofAgent) {
// TODO create OFAgent instance with the given network ID, controllers checkNotNull(ofAgent, ERR_NULL_OFAGENT);
// TODO and device, flowRule, link, and packet service for the virtual network ofAgentStore.updateOfAgent(ofAgent);
// TODO start the OFAgent only if the virtual network state is active log.info(String.format(MSG_OFAGENT, ofAgent.networkId(), MSG_UPDATED));
} }
@Override @Override
public void removeAgent(NetworkId networkId) { public void removeAgent(NetworkId networkId) {
// TODO stop and remove the OFAgent for the network checkNotNull(networkId, ERR_NULL_NETID);
synchronized (this) {
OFAgent existing = ofAgentStore.ofAgent(networkId);
if (existing == null) {
final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
throw new IllegalStateException(error);
}
if (existing.state() == STARTED) {
final String error = String.format(MSG_OFAGENT, networkId, ERR_IN_USE);
throw new IllegalStateException(error);
}
ofAgentStore.removeOfAgent(networkId);
log.info(String.format(MSG_OFAGENT, networkId, MSG_REMOVED));
}
} }
@Override @Override
public void startAgent(NetworkId networkId) { public void startAgent(NetworkId networkId) {
// TODO starts the agent for the network checkNotNull(networkId, ERR_NULL_NETID);
synchronized (this) {
OFAgent existing = ofAgentStore.ofAgent(networkId);
if (existing == null) {
final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
throw new IllegalStateException(error);
}
if (existing.state() == STARTED) {
log.warn(String.format(MSG_OFAGENT, networkId, MSG_IN_STARTED));
return;
}
OFAgent updated = DefaultOFAgent.builder(existing).state(STARTED).build();
ofAgentStore.updateOfAgent(updated);
}
} }
@Override @Override
public void stopAgent(NetworkId networkId) { public void stopAgent(NetworkId networkId) {
// TODO stops the agent for the network checkNotNull(networkId, ERR_NULL_NETID);
synchronized (this) {
OFAgent existing = ofAgentStore.ofAgent(networkId);
if (existing == null) {
final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
throw new IllegalStateException(error);
}
if (existing.state() == STOPPED) {
log.warn(String.format(MSG_OFAGENT, networkId, MSG_IN_STOPPED));
return;
}
OFAgent updated = DefaultOFAgent.builder(existing).state(STOPPED).build();
ofAgentStore.updateOfAgent(updated);
}
} }
@Override @Override
public boolean isActive(NetworkId networkId) { public Set<OFAgent> agents() {
// TODO manage the OF agent status return ofAgentStore.ofAgents();
return false; }
@Override
public OFAgent agent(NetworkId networkId) {
checkNotNull(networkId, ERR_NULL_NETID);
return ofAgentStore.ofAgent(networkId);
}
private class InternalLeadershipListener implements LeadershipEventListener {
@Override
public boolean isRelevant(LeadershipEvent event) {
// TODO check if local node is relevant to the leadership change event
return false;
}
@Override
public void event(LeadershipEvent event) {
switch (event.type()) {
case LEADER_CHANGED:
case LEADER_AND_CANDIDATES_CHANGED:
// TODO handle leadership changed events -> restart agents?
default:
break;
}
}
} }
private class InternalVirtualNetworkListener implements VirtualNetworkListener { private class InternalVirtualNetworkListener implements VirtualNetworkListener {
@Override
public boolean isRelevant(VirtualNetworkEvent event) {
// do not allow without leadership
return Objects.equals(localId, leadershipService.getLeader(appId.name()));
}
@Override @Override
public void event(VirtualNetworkEvent event) { public void event(VirtualNetworkEvent event) {
// TODO handle virtual network start and stop switch (event.type()) {
case NETWORK_UPDATED:
// TODO handle virtual network stopped -> stop agent
break;
case NETWORK_REMOVED:
// TODO remove related OFAgent -> stop agent
break;
case NETWORK_ADDED:
case VIRTUAL_DEVICE_ADDED:
case VIRTUAL_DEVICE_UPDATED:
case VIRTUAL_DEVICE_REMOVED:
case VIRTUAL_PORT_ADDED:
case VIRTUAL_PORT_UPDATED:
case VIRTUAL_PORT_REMOVED:
default:
// do nothing
break;
}
}
}
private class InternalOFAgentStoreDelegate implements OFAgentStoreDelegate {
@Override
public void notify(OFAgentEvent event) {
if (event != null) {
log.trace("send ofagent event {}", event);
process(event);
}
} }
} }
} }

View File

@ -15,39 +15,35 @@
*/ */
package org.onosproject.ofagent.impl; package org.onosproject.ofagent.impl;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutException;
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.osgi.ServiceDirectory;
import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.ofagent.api.OFSwitch; import org.onosproject.ofagent.api.OFSwitch;
import org.projectfloodlight.openflow.protocol.OFErrorMsg; import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFFactories;
import org.projectfloodlight.openflow.protocol.OFFactory;
import org.projectfloodlight.openflow.protocol.OFMessage; import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import static org.onosproject.ofagent.impl.OFChannelHandler.ChannelState.INIT;
/** /**
* Implementation of OpenFlow channel handler. * Implementation of OpenFlow channel handler.
* It processes OpenFlow message according to the channel state. * It processes OpenFlow message according to the channel state.
*/ */
public final class OFChannelHandler extends ChannelDuplexHandler { public final class OFChannelHandler extends ChannelDuplexHandler {
private static final String MSG_CHANNEL_STATE = "Set channel(%s) state: %s";
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private final OFSwitch ofSwitch; private final OFSwitch ofSwitch;
private ChannelHandlerContext ctx; private Channel channel;
private ChannelState state; private ChannelState state;
protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
protected VirtualNetworkService vNetService;
enum ChannelState { enum ChannelState {
@ -62,7 +58,6 @@ public final class OFChannelHandler extends ChannelDuplexHandler {
@Override @Override
void processOFMessage(final OFChannelHandler handler, void processOFMessage(final OFChannelHandler handler,
final OFMessage msg) { final OFMessage msg) {
switch (msg.getType()) { switch (msg.getType()) {
case HELLO: case HELLO:
handler.setState(ChannelState.WAIT_FEATURE_REQUEST); handler.setState(ChannelState.WAIT_FEATURE_REQUEST);
@ -77,17 +72,16 @@ public final class OFChannelHandler extends ChannelDuplexHandler {
@Override @Override
void processOFMessage(final OFChannelHandler handler, void processOFMessage(final OFChannelHandler handler,
final OFMessage msg) { final OFMessage msg) {
switch (msg.getType()) { switch (msg.getType()) {
case FEATURES_REQUEST: case FEATURES_REQUEST:
handler.ofSwitch.processFeaturesRequest(handler.ctx.channel(), msg); handler.ofSwitch.processFeaturesRequest(handler.channel, msg);
handler.setState(ChannelState.ESTABLISHED); handler.setState(ChannelState.ESTABLISHED);
break; break;
case ECHO_REQUEST: case ECHO_REQUEST:
handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg); handler.ofSwitch.processEchoRequest(handler.channel, msg);
break; break;
case ERROR: case ERROR:
handler.logErrorClose(handler.ctx, (OFErrorMsg) msg); handler.logErrorClose((OFErrorMsg) msg);
break; break;
default: default:
handler.illegalMessageReceived(msg); handler.illegalMessageReceived(msg);
@ -117,10 +111,10 @@ public final class OFChannelHandler extends ChannelDuplexHandler {
//TODO implement //TODO implement
break; break;
case ECHO_REQUEST: case ECHO_REQUEST:
handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg); handler.ofSwitch.processEchoRequest(handler.channel, msg);
break; break;
case ERROR: case ERROR:
handler.logErrorClose(handler.ctx, (OFErrorMsg) msg); handler.logErrorClose((OFErrorMsg) msg);
break; break;
default: default:
handler.unhandledMessageReceived(msg); handler.unhandledMessageReceived(msg);
@ -128,8 +122,8 @@ public final class OFChannelHandler extends ChannelDuplexHandler {
} }
} }
}; };
abstract void processOFMessage(final OFChannelHandler handler,
final OFMessage msg); abstract void processOFMessage(final OFChannelHandler handler, final OFMessage msg);
} }
/** /**
@ -140,47 +134,36 @@ public final class OFChannelHandler extends ChannelDuplexHandler {
public OFChannelHandler(OFSwitch ofSwitch) { public OFChannelHandler(OFSwitch ofSwitch) {
super(); super();
this.ofSwitch = ofSwitch; this.ofSwitch = ofSwitch;
setState(INIT);
setState(ChannelState.INIT);
ServiceDirectory services = new DefaultServiceDirectory();
vNetService = services.get(VirtualNetworkService.class);
} }
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx; this.channel = ctx.channel();
log.debug("Channel Active. Send OF_13 Hello to {}", ctx.channel().remoteAddress()); // FIXME move this to channel handler and add channel when OF handshake is done
ofSwitch.addControllerChannel(channel);
try { try {
ofSwitch.sendOfHello(ctx.channel()); ofSwitch.sendOfHello(channel);
log.trace("Send OF_13 Hello to {}", channel.remoteAddress());
setState(ChannelState.WAIT_HELLO); setState(ChannelState.WAIT_HELLO);
} catch (Throwable cause) { } catch (Exception ex) {
log.error("Exception occured because of{}", cause.getMessage()); log.error("Failed sending OF_13 Hello to {} for {}", channel.remoteAddress(), ex.getMessage());
} }
} }
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) public void channelInactive(ChannelHandlerContext ctx) {
throws Exception { ofSwitch.deleteControllerChannel(channel);
log.info("Device {} disconnected from controller {}", ofSwitch.dpid(), channel.remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try { try {
if (msg instanceof List) { state.processOFMessage(this, (OFMessage) msg);
((List) msg).forEach(ofm -> { } catch (Exception ex) {
state.processOFMessage(this, (OFMessage) ofm); ctx.fireExceptionCaught(ex);
});
} else {
state.processOFMessage(this, (OFMessage) msg);
}
} catch (Throwable cause) {
log.error("Exception occured {}", cause.getMessage());
} }
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
} }
@Override @Override
@ -188,41 +171,40 @@ public final class OFChannelHandler extends ChannelDuplexHandler {
if (cause instanceof ReadTimeoutException) { if (cause instanceof ReadTimeoutException) {
log.error("Connection closed because of ReadTimeoutException {}", cause.getMessage()); log.error("Connection closed because of ReadTimeoutException {}", cause.getMessage());
} else if (cause instanceof ClosedChannelException) { } else if (cause instanceof ClosedChannelException) {
log.error("ClosedChannelException occured"); log.error("ClosedChannelException occurred");
return; return;
} else if (cause instanceof RejectedExecutionException) { } else if (cause instanceof RejectedExecutionException) {
log.error("Could not process message: queue full"); log.error("Could not process message: queue full");
} else if (cause instanceof IOException) { } else if (cause instanceof IOException) {
log.error("IOException occured"); log.error("IOException occurred");
} else { } else {
log.error("Error while processing message from switch {}", cause.getMessage()); log.error("Error while processing message from switch {}", cause.getMessage());
} }
ctx.close(); channel.close();
} }
private void setState(ChannelState state) { private void setState(ChannelState state) {
this.state = state; this.state = state;
if (state != INIT) {
log.debug(String.format(MSG_CHANNEL_STATE, channel.remoteAddress(), state.name()));
}
} }
private void logErrorClose(ChannelHandlerContext ctx, OFErrorMsg errorMsg) { private void logErrorClose(OFErrorMsg errorMsg) {
log.error("{} from switch {} in state {}", log.error("{} from switch {} in state {}",
errorMsg, errorMsg,
ofSwitch.device().id().toString(), ofSwitch.dpid(),
state); state);
channel.close();
log.error("Disconnecting...");
ctx.close();
} }
private void illegalMessageReceived(OFMessage ofMessage) { private void illegalMessageReceived(OFMessage ofMessage) {
log.warn("Controller should never send this message {} in current state {}", log.warn("Controller should never send this message {} in current state {}",
ofMessage.getType().toString(), ofMessage.getType(), state);
state);
} }
private void unhandledMessageReceived(OFMessage ofMessage) { private void unhandledMessageReceived(OFMessage ofMessage) {
log.warn("Unhandled message {} received in state {}. Ignored", log.warn("Unexpected message {} received in state {}",
ofMessage.getType().toString(), ofMessage.getType(), state);
state);
} }
} }

View File

@ -39,7 +39,6 @@ public final class OFChannelInitializer extends ChannelInitializer<SocketChannel
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new OFMessageDecoder()) ch.pipeline().addLast(new OFMessageDecoder())
.addLast(new OFMessageEncoder()) .addLast(new OFMessageEncoder())
.addLast(new ReadTimeoutHandler(READ_TIMEOUT)) .addLast(new ReadTimeoutHandler(READ_TIMEOUT))

View File

@ -38,10 +38,17 @@ public final class OFConnectionHandler implements ChannelFutureListener {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private static final String MSG_STATE = "Device %s %s to controller %s:%s";
private static final String MSG_CONNECTING = "connecting";
private static final String MSG_CONNECTED = "connected";
private static final String MSG_FAILED = "failed to connect";
private final AtomicInteger retryCount; private final AtomicInteger retryCount;
private final OFSwitch ofSwitch; private final OFSwitch ofSwitch;
private final OFController controller; private final OFController controller;
private final EventLoopGroup workGroup; private final EventLoopGroup workGroup;
// TODO make this value configurable
private static final int MAX_RETRY = 3; private static final int MAX_RETRY = 3;
/** /**
@ -61,32 +68,40 @@ public final class OFConnectionHandler implements ChannelFutureListener {
/** /**
* Creates a connection to the supplied controller. * Creates a connection to the supplied controller.
*
*/ */
public void connect() { public void connect() {
SocketAddress remoteAddr = new InetSocketAddress(
SocketAddress remoteAddr = new InetSocketAddress(controller.ip().toInetAddress(), controller.port().toInt()); controller.ip().toInetAddress(), controller.port().toInt());
log.debug("Connecting to controller {}:{}", controller.ip(), controller.port());
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup) bootstrap.group(workGroup)
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_KEEPALIVE, true)
.handler(new OFChannelInitializer(ofSwitch)); .handler(new OFChannelInitializer(ofSwitch));
log.debug(String.format(MSG_STATE,
ofSwitch.dpid(),
MSG_CONNECTING,
controller.ip(),
controller.port()));
bootstrap.connect(remoteAddr).addListener(this); bootstrap.connect(remoteAddr).addListener(this);
} }
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
ofSwitch.addControllerChannel(future.channel()); log.info(String.format(MSG_STATE,
log.debug("Connected to controller {}:{} for device {}", ofSwitch.dpid(),
controller.ip(), controller.port(), ofSwitch.device().id()); MSG_CONNECTED,
controller.ip(),
controller.port()));
} else { } else {
log.info("Failed to connect controller {}:{}. Retry...", controller.ip(), controller.port()); if (retryCount.getAndIncrement() > MAX_RETRY) {
if (retryCount.getAndIncrement() < MAX_RETRY) { log.warn(String.format(MSG_STATE,
ofSwitch.dpid(),
MSG_FAILED,
controller.ip(),
controller.port()));
} else {
this.connect(); this.connect();
} }
} }

View File

@ -32,22 +32,20 @@ import java.util.List;
public final class OFMessageDecoder extends ByteToMessageDecoder { public final class OFMessageDecoder extends ByteToMessageDecoder {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private final OFMessageReader<OFMessage> reader = OFFactories.getGenericReader();
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception { throws Exception {
if (!ctx.channel().isActive()) { if (!ctx.channel().isActive()) {
return; return;
} }
try { try {
OFMessageReader<OFMessage> reader = OFFactories.getGenericReader();
OFMessage message = reader.readFrom(in); OFMessage message = reader.readFrom(in);
out.add(message); out.add(message);
} catch (Throwable cause) { } catch (Throwable cause) {
log.error("Exception occured while processing decoding because of {}", cause.getMessage()); log.error("Failed decode OF message for {}", cause.getMessage());
} }
} }
} }

View File

@ -16,50 +16,24 @@
package org.onosproject.ofagent.impl; package org.onosproject.ofagent.impl;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import org.projectfloodlight.openflow.protocol.OFMessage; import org.projectfloodlight.openflow.protocol.OFMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Encodes OFMessage to a byte buffer. * Encodes OFMessage to a byte buffer.
*/ */
public final class OFMessageEncoder extends MessageToByteEncoder<Iterable<OFMessage>> { public final class OFMessageEncoder extends MessageToByteEncoder<Iterable<OFMessage>> {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override @Override
protected void encode(ChannelHandlerContext ctx, Iterable<OFMessage> msgList, ByteBuf out) protected void encode(ChannelHandlerContext ctx, Iterable<OFMessage> msgList, ByteBuf out)
throws Exception { throws Exception {
if (!ctx.channel().isActive()) { if (!ctx.channel().isActive()) {
return; return;
} }
if (msgList instanceof Iterable) { for (OFMessage ofm : msgList) {
msgList.forEach(msg -> { ofm.writeTo(out);
try {
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer();
msg.writeTo(byteBuf);
ctx.writeAndFlush(byteBuf);
} catch (Exception e) {
log.error("error occured because of {}", e.getMessage());
}
});
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof EncoderException) {
log.error("Connection closed because of EncoderException {}", cause.getMessage());
ctx.close();
} else {
log.error("Exception occured while processing encoding because of {}", cause.getMessage());
ctx.close();
} }
} }
} }

View File

@ -0,0 +1,372 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.impl;
import com.google.common.collect.ImmutableSet;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleListener;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
import org.onosproject.ofagent.api.OFAgent;
import org.onosproject.ofagent.api.OFAgentEvent;
import org.onosproject.ofagent.api.OFAgentListener;
import org.onosproject.ofagent.api.OFAgentService;
import org.onosproject.ofagent.api.OFController;
import org.onosproject.ofagent.api.OFSwitch;
import org.onosproject.ofagent.api.OFSwitchCapabilities;
import org.onosproject.ofagent.api.OFSwitchService;
import org.projectfloodlight.openflow.types.DatapathId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.ofagent.api.OFAgentService.APPLICATION_NAME;
/**
* Manages OF switches.
*/
@Component(immediate = true)
@Service
public class OFSwitchManager implements OFSwitchService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final OFSwitchCapabilities DEFAULT_CAPABILITIES = DefaultOFSwitchCapabilities.builder()
.flowStats()
.tableStats()
.portStats()
.groupStats()
.queueStats()
.ipReasm()
.portBlocked()
.build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VirtualNetworkService virtualNetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OFAgentService ofAgentService;
private final ConcurrentHashMap<DeviceId, OFSwitch> ofSwitchMap = new ConcurrentHashMap<>();
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final OFAgentListener ofAgentListener = new InternalOFAgentListener();
private final DeviceListener deviceListener = new InternalDeviceListener();
private final FlowRuleListener flowRuleListener = new InternalFlowRuleListener();
private final PacketProcessor packetProcessor = new InternalPacketProcessor();
private NioEventLoopGroup ioWorker;
private ApplicationId appId;
private NodeId localId;
@Activate
protected void activate() {
appId = coreService.registerApplication(APPLICATION_NAME);
localId = clusterService.getLocalNode().id();
ioWorker = new NioEventLoopGroup();
ofAgentService.addListener(ofAgentListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
ofAgentService.removeListener(ofAgentListener);
ofAgentService.agents().forEach(this::processOFAgentStopped);
ioWorker.shutdownGracefully();
eventExecutor.shutdown();
log.info("Stopped");
}
@Override
public Set<OFSwitch> ofSwitches() {
return ImmutableSet.copyOf(ofSwitchMap.values());
}
@Override
public Set<OFSwitch> ofSwitches(NetworkId networkId) {
Set<OFSwitch> ofSwitches = devices(networkId).stream()
.map(ofSwitchMap::get)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
return ImmutableSet.copyOf(ofSwitches);
}
private void addOFSwitch(DeviceId deviceId) {
OFSwitch ofSwitch = DefaultOFSwitch.of(
dpidWithDeviceId(deviceId),
DEFAULT_CAPABILITIES);
ofSwitchMap.put(deviceId, ofSwitch);
log.debug("Added virtual OF switch for {}", deviceId);
// TODO connect controllers if the agent is in started state
}
private void deleteOFSwitch(DeviceId deviceId) {
// TODO disconnect switch if it has active connection
OFSwitch ofSwitch = ofSwitchMap.remove(deviceId);
if (ofSwitch != null) {
log.debug("Removed virtual OFSwitch for {}", deviceId);
}
}
private void connectController(OFSwitch ofSwitch, Set<OFController> controllers) {
controllers.forEach(controller -> {
OFConnectionHandler connectionHandler = new OFConnectionHandler(
ofSwitch,
controller,
ioWorker);
connectionHandler.connect();
});
log.debug("Connection requested for {}, controller:{}", ofSwitch.dpid(), controllers);
}
private void disconnectController(OFSwitch ofSwitch, Set<OFController> controllers) {
Set<SocketAddress> controllerAddrs = controllers.stream()
.map(ctrl -> new InetSocketAddress(ctrl.ip().toInetAddress(), ctrl.port().toInt()))
.collect(Collectors.toSet());
ofSwitch.controllerChannels().stream()
.filter(channel -> controllerAddrs.contains(channel.remoteAddress()))
.forEach(ChannelOutboundInvoker::disconnect);
log.debug("Disconnection requested for {}, controller:{}", ofSwitch.dpid(), controllers);
}
private Set<DeviceId> devices(NetworkId networkId) {
DeviceService deviceService = virtualNetService.get(
networkId,
DeviceService.class);
Set<DeviceId> deviceIds = Tools.stream(deviceService.getAvailableDevices())
.map(Device::id)
.collect(Collectors.toSet());
return ImmutableSet.copyOf(deviceIds);
}
private DatapathId dpidWithDeviceId(DeviceId deviceId) {
String strDeviceId = deviceId.toString().split(":")[1];
checkArgument(strDeviceId.length() == 16, "Invalid device ID " + strDeviceId);
String resultedHexString = "";
for (int i = 0; i < 8; i++) {
resultedHexString = resultedHexString + strDeviceId.charAt(2 * i)
+ strDeviceId.charAt(2 * i + 1);
if (i != 7) {
resultedHexString += ":";
}
}
return DatapathId.of(resultedHexString);
}
private void processOFAgentCreated(OFAgent ofAgent) {
devices(ofAgent.networkId()).forEach(this::addOFSwitch);
DeviceService deviceService = virtualNetService.get(
ofAgent.networkId(),
DeviceService.class);
deviceService.addListener(deviceListener);
}
private void processOFAgentRemoved(OFAgent ofAgent) {
devices(ofAgent.networkId()).forEach(this::deleteOFSwitch);
DeviceService deviceService = virtualNetService.get(
ofAgent.networkId(),
DeviceService.class);
deviceService.removeListener(deviceListener);
}
private void processOFAgentStarted(OFAgent ofAgent) {
devices(ofAgent.networkId()).forEach(deviceId -> {
OFSwitch ofSwitch = ofSwitchMap.get(deviceId);
if (ofSwitch != null) {
connectController(ofSwitch, ofAgent.controllers());
}
});
PacketService packetService = virtualNetService.get(
ofAgent.networkId(),
PacketService.class);
packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
FlowRuleService flowRuleService = virtualNetService.get(
ofAgent.networkId(),
FlowRuleService.class);
flowRuleService.addListener(flowRuleListener);
}
private void processOFAgentStopped(OFAgent ofAgent) {
devices(ofAgent.networkId()).forEach(deviceId -> {
OFSwitch ofSwitch = ofSwitchMap.get(deviceId);
if (ofSwitch != null) {
disconnectController(ofSwitch, ofAgent.controllers());
}
});
PacketService packetService = virtualNetService.get(
ofAgent.networkId(),
PacketService.class);
packetService.removeProcessor(packetProcessor);
FlowRuleService flowRuleService = virtualNetService.get(
ofAgent.networkId(),
FlowRuleService.class);
flowRuleService.removeListener(flowRuleListener);
}
private class InternalOFAgentListener implements OFAgentListener {
@Override
public boolean isRelevant(OFAgentEvent event) {
return Objects.equals(localId, leadershipService.getLeader(appId.name()));
}
@Override
public void event(OFAgentEvent event) {
switch (event.type()) {
case OFAGENT_CREATED:
eventExecutor.execute(() -> {
OFAgent ofAgent = event.subject();
log.debug("Processing OFAgent created: {}", ofAgent);
processOFAgentCreated(ofAgent);
});
break;
case OFAGENT_REMOVED:
eventExecutor.execute(() -> {
OFAgent ofAgent = event.subject();
log.debug("Processing OFAgent removed: {}", ofAgent);
processOFAgentRemoved(ofAgent);
});
break;
case OFAGENT_CONTROLLER_ADDED:
// TODO handle additional controller
break;
case OFAGENT_CONTROLLER_REMOVED:
// TODO handle removed controller
break;
case OFAGENT_STARTED:
eventExecutor.execute(() -> {
OFAgent ofAgent = event.subject();
log.debug("Processing OFAgent started: {}", ofAgent);
processOFAgentStarted(ofAgent);
});
break;
case OFAGENT_STOPPED:
eventExecutor.execute(() -> {
OFAgent ofAgent = event.subject();
log.debug("Processing OFAgent stopped: {}", ofAgent);
processOFAgentStopped(ofAgent);
});
break;
default:
// do nothing
break;
}
}
}
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_ADDED:
eventExecutor.execute(() -> {
Device device = event.subject();
log.debug("Processing device added: {}", device);
addOFSwitch(device.id());
});
break;
case DEVICE_REMOVED:
eventExecutor.execute(() -> {
Device device = event.subject();
log.debug("Processing device added: {}", device);
deleteOFSwitch(device.id());
});
break;
case DEVICE_AVAILABILITY_CHANGED:
// TODO handle event
break;
case DEVICE_UPDATED:
case DEVICE_SUSPENDED:
case PORT_ADDED:
// TODO handle event
case PORT_REMOVED:
// TODO handle event
case PORT_STATS_UPDATED:
case PORT_UPDATED:
default:
break;
}
}
}
private class InternalPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
// TODO handle packet-in
}
}
private class InternalFlowRuleListener implements FlowRuleListener {
@Override
public void event(FlowRuleEvent event) {
// TODO handle flow rule event
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.impl;
import com.google.common.collect.Sets;
import com.google.common.testing.EqualsTester;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort;
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.ofagent.api.OFAgent;
import org.onosproject.ofagent.api.OFController;
import java.util.Set;
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
import static org.onosproject.ofagent.api.OFAgent.State.STOPPED;
/**
* Unit test of DefaultOFAgent model entity.
*/
public class DefaultOFAgentTest {
private static final Set<OFController> CONTROLLER_1 = Sets.newHashSet(
DefaultOFController.of(
IpAddress.valueOf("192.168.0.3"),
TpPort.tpPort(6653)));
private static final Set<OFController> CONTROLLER_2 = Sets.newHashSet(
DefaultOFController.of(
IpAddress.valueOf("192.168.0.3"),
TpPort.tpPort(6653)),
DefaultOFController.of(
IpAddress.valueOf("192.168.0.4"),
TpPort.tpPort(6653)));
private static final NetworkId NETWORK_1 = NetworkId.networkId(1);
private static final NetworkId NETWORK_2 = NetworkId.networkId(2);
private static final OFAgent OFAGENT = DefaultOFAgent.builder()
.networkId(NETWORK_1)
.controllers(CONTROLLER_1)
.state(STOPPED)
.build();
private static final OFAgent SAME_AS_OFAGENT_1 = DefaultOFAgent.builder()
.networkId(NETWORK_1)
.controllers(CONTROLLER_2)
.state(STOPPED)
.build();
private static final OFAgent SAME_AS_OFAGENT_2 = DefaultOFAgent.builder()
.networkId(NETWORK_1)
.controllers(CONTROLLER_1)
.state(STARTED)
.build();
private static final OFAgent ANOTHER_OFAGENT = DefaultOFAgent.builder()
.networkId(NETWORK_2)
.controllers(CONTROLLER_1)
.state(STOPPED)
.build();
@Test
public void testImmutability() {
assertThatClassIsImmutable(DefaultOFAgent.class);
}
@Test
public void testEquality() {
new EqualsTester()
.addEqualityGroup(OFAGENT, SAME_AS_OFAGENT_1, SAME_AS_OFAGENT_2)
.addEqualityGroup(ANOTHER_OFAGENT)
.testEquals();
}
}

View File

@ -0,0 +1,266 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestTools;
import org.onlab.junit.TestUtils;
import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.event.Event;
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.ofagent.api.OFAgent;
import org.onosproject.ofagent.api.OFAgentEvent;
import org.onosproject.ofagent.api.OFAgentListener;
import org.onosproject.ofagent.api.OFController;
import org.onosproject.store.service.TestStorageService;
import java.util.List;
import java.util.Set;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
import static org.onosproject.ofagent.api.OFAgent.State.STOPPED;
import static org.onosproject.ofagent.api.OFAgentEvent.Type.*;
/**
* Junit tests for OFAgent target.
*/
public class OFAgentManagerTest {
private static final ApplicationId APP_ID = new DefaultApplicationId(1, "test");
private static final ControllerNode LOCAL_NODE =
new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf("127.0.0.1"));
private static final Set<OFController> CONTROLLER_1 = Sets.newHashSet(
DefaultOFController.of(
IpAddress.valueOf("192.168.0.3"),
TpPort.tpPort(6653)));
private static final Set<OFController> CONTROLLER_2 = Sets.newHashSet(
DefaultOFController.of(
IpAddress.valueOf("192.168.0.3"),
TpPort.tpPort(6653)),
DefaultOFController.of(
IpAddress.valueOf("192.168.0.4"),
TpPort.tpPort(6653)));
private static final NetworkId NETWORK_1 = NetworkId.networkId(1);
private static final NetworkId NETWORK_2 = NetworkId.networkId(2);
private static final OFAgent OFAGENT_1 = DefaultOFAgent.builder()
.networkId(NETWORK_1)
.state(STOPPED)
.build();
private static final OFAgent OFAGENT_1_CTRL_1 = DefaultOFAgent.builder()
.networkId(NETWORK_1)
.controllers(CONTROLLER_1)
.state(STOPPED)
.build();
private static final OFAgent OFAGENT_1_CTRL_2 = DefaultOFAgent.builder()
.networkId(NETWORK_1)
.controllers(CONTROLLER_2)
.state(STOPPED)
.build();
private static final OFAgent OFAGENT_2 = DefaultOFAgent.builder()
.networkId(NETWORK_2)
.state(STOPPED)
.build();
private final TestOFAgentListener testListener = new TestOFAgentListener();
private final CoreService mockCoreService = createMock(CoreService.class);
private final LeadershipService mockLeadershipService = createMock(LeadershipService.class);
private final VirtualNetworkService mockVirtualNetService = createMock(VirtualNetworkService.class);
private final ClusterService mockClusterService = createMock(ClusterService.class);
private OFAgentManager target;
private DistributedOFAgentStore ofAgentStore;
@Before
public void setUp() throws Exception {
ofAgentStore = new DistributedOFAgentStore();
TestUtils.setField(ofAgentStore, "coreService", createMock(CoreService.class));
TestUtils.setField(ofAgentStore, "storageService", new TestStorageService());
ofAgentStore.activate();
expect(mockCoreService.registerApplication(anyObject()))
.andReturn(APP_ID)
.anyTimes();
replay(mockCoreService);
expect(mockClusterService.getLocalNode())
.andReturn(LOCAL_NODE)
.anyTimes();
replay(mockClusterService);
expect(mockLeadershipService.runForLeadership(anyObject()))
.andReturn(null)
.anyTimes();
mockLeadershipService.addListener(anyObject());
mockLeadershipService.removeListener(anyObject());
mockLeadershipService.withdraw(anyObject());
replay(mockLeadershipService);
target = new OFAgentManager();
target.coreService = mockCoreService;
target.leadershipService = mockLeadershipService;
target.virtualNetService = mockVirtualNetService;
target.clusterService = mockClusterService;
target.ofAgentStore = ofAgentStore;
target.addListener(testListener);
target.activate();
}
@After
public void tearDown() {
target.removeListener(testListener);
ofAgentStore.deactivate();
target.deactivate();
ofAgentStore = null;
target = null;
}
@Test
public void testCreateAndRemoveAgent() {
target.createAgent(OFAGENT_1);
Set<OFAgent> agents = target.agents();
assertEquals("OFAgent set size did not match", 1, agents.size());
target.createAgent(OFAGENT_2);
agents = target.agents();
assertEquals("OFAgent set size did not match", 2, agents.size());
target.removeAgent(NETWORK_1);
agents = target.agents();
assertEquals("OFAgent set size did not match", 1, agents.size());
target.removeAgent(NETWORK_2);
agents = target.agents();
assertEquals("OFAgent set size did not match", 0, agents.size());
validateEvents(OFAGENT_CREATED, OFAGENT_CREATED, OFAGENT_REMOVED, OFAGENT_REMOVED);
}
@Test(expected = NullPointerException.class)
public void testCreateNullAgent() {
target.createAgent(null);
}
@Test(expected = IllegalArgumentException.class)
public void testCreateDuplicateAgent() {
target.createAgent(OFAGENT_1);
target.createAgent(OFAGENT_1);
}
@Test(expected = NullPointerException.class)
public void testRemoveNullAgent() {
target.removeAgent(null);
}
@Test(expected = IllegalStateException.class)
public void testRemoveNotFoundAgent() {
target.removeAgent(NETWORK_1);
}
@Test(expected = IllegalStateException.class)
public void testRemoveStartedAgent() {
target.createAgent(OFAGENT_1);
target.startAgent(NETWORK_1);
target.removeAgent(NETWORK_1);
}
@Test
public void testStartAndStopAgent() {
target.createAgent(OFAGENT_1);
target.startAgent(NETWORK_1);
OFAgent ofAgent = target.agent(NETWORK_1);
assertEquals("OFAgent state did not match", STARTED, ofAgent.state());
target.stopAgent(NETWORK_1);
ofAgent = target.agent(NETWORK_1);
assertEquals("OFAgent state did not match", STOPPED, ofAgent.state());
validateEvents(OFAGENT_CREATED, OFAGENT_STARTED, OFAGENT_STOPPED);
}
@Test
public void testAddController() {
target.createAgent(OFAGENT_1);
target.updateAgent(OFAGENT_1_CTRL_1);
OFAgent ofAgent = target.agent(NETWORK_1);
assertEquals("OFAgent controller did not match", CONTROLLER_1, ofAgent.controllers());
target.updateAgent(OFAGENT_1_CTRL_2);
ofAgent = target.agent(NETWORK_1);
assertEquals("OFAgent controller did not match", CONTROLLER_2, ofAgent.controllers());
validateEvents(OFAGENT_CREATED, OFAGENT_CONTROLLER_ADDED, OFAGENT_CONTROLLER_ADDED);
}
@Test
public void testRemoveController() {
target.createAgent(OFAGENT_1_CTRL_2);
target.updateAgent(OFAGENT_1_CTRL_1);
OFAgent ofAgent = target.agent(NETWORK_1);
assertEquals("OFAgent controller did not match", CONTROLLER_1, ofAgent.controllers());
target.updateAgent(OFAGENT_1);
ofAgent = target.agent(NETWORK_1);
assertTrue("OFAgent controller did not match", ofAgent.controllers().isEmpty());
validateEvents(OFAGENT_CREATED, OFAGENT_CONTROLLER_REMOVED, OFAGENT_CONTROLLER_REMOVED);
}
private void validateEvents(Enum... types) {
TestTools.assertAfter(100, () -> {
int i = 0;
assertEquals("Number of events does not match", types.length, testListener.events.size());
for (Event event : testListener.events) {
assertEquals("Incorrect event received", types[i], event.type());
i++;
}
testListener.events.clear();
});
}
private static class TestOFAgentListener implements OFAgentListener {
private List<OFAgentEvent> events = Lists.newArrayList();
@Override
public void event(OFAgentEvent event) {
events.add(event);
}
}
}