diff --git a/apps/ofagent/BUCK b/apps/ofagent/BUCK index eeb3aba345..1c8ee1e605 100644 --- a/apps/ofagent/BUCK +++ b/apps/ofagent/BUCK @@ -1,19 +1,30 @@ COMPILE_DEPS = [ '//lib:CORE_DEPS', + '//core/store/serializers:onos-core-serializers', + '//core/common:onos-core-common', + '//incubator/api:onos-incubator-api', + '//cli:onos-cli', + '//lib:org.apache.karaf.shell.console', '//lib:netty-transport', '//lib:netty-buffer', '//lib:netty-codec', '//lib:netty-handler', - '//incubator/api:onos-incubator-api', '//lib:openflowj-3.0', ] +TEST_DEPS = [ + '//lib:TEST_ADAPTERS', + '//core/api:onos-api-tests', + '//core/common:onos-core-common-tests', +] + EXCLUDED_BUNDLES = [ '//lib:openflowj-3.0', ] osgi_jar_with_tests ( deps = COMPILE_DEPS, + test_deps = TEST_DEPS, ) onos_app ( diff --git a/apps/ofagent/pom.xml b/apps/ofagent/pom.xml index 1416332ea6..22e6996f9c 100644 --- a/apps/ofagent/pom.xml +++ b/apps/ofagent/pom.xml @@ -51,6 +51,71 @@ org.osgi org.osgi.compendium + + org.onosproject + onos-core-serializers + ${project.version} + + + org.onosproject + onos-cli + ${project.version} + + + org.apache.karaf.shell + org.apache.karaf.shell.console + + + org.apache.felix + org.apache.felix.scr.annotations + + + org.onosproject + onlab-osgi + ${project.version} + + + org.onosproject + onlab-misc + ${project.version} + + + org.onosproject + onos-core-common + ${project.version} + + + org.onosproject + onos-incubator-api + + + org.onosproject + onlab-junit + test + + + org.onosproject + onos-api + tests + test + + + org.onosproject + onos-core-common + tests + test + + + com.google.guava + guava-testlib + ${guava.version} + test + + + org.easymock + easymock + test + org.projectfloodlight openflowj @@ -58,27 +123,8 @@ io.netty - netty-transport - - - io.netty - netty-handler - - - io.netty - netty-buffer - - - io.netty - netty-codec - - - org.onosproject - onos-of-api - - - org.onosproject - onos-incubator-api + netty-all + ${netty4.version} diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgent.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgent.java index b44f48e006..0db4489c9b 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgent.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgent.java @@ -15,19 +15,29 @@ */ package org.onosproject.ofagent.api; -import io.netty.channel.nio.NioEventLoopGroup; import org.onosproject.incubator.net.virtual.NetworkId; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; /** - * Representation of an OF agent, which brokers virtual devices and external - * controllers by handling OpenFlow connections and messages between them. + * Representation of an OpenFlow agent, which holds the mapping between the virtual + * network and the external OpenFlow controllers. */ public interface OFAgent { + enum State { + + /** + * Specifies that the ofagent state is started. + */ + STARTED, + + /** + * Specifies that the ofagent state is stopped. + */ + STOPPED + } + /** * Returns the identifier of the virtual network that this agent cares for. * @@ -43,14 +53,11 @@ public interface OFAgent { Set controllers(); /** - * Starts the OpenFlow agent. + * Returns the admin state of the agent. + * + * @return state */ - void start(); - - /** - * Stops the OpenFlow agent. - */ - void stop(); + State state(); /** * Builder of OF agent entities. @@ -72,15 +79,6 @@ public interface OFAgent { */ Builder networkId(NetworkId networkId); - /** - * Returns OF agent builder with the supplied network services for the - * virtual network. - * - * @param services network services for the virtual network - * @return of agent builder - */ - Builder services(Map, Object> services); - /** * Returns OF agent builder with the supplied controllers. * @@ -90,19 +88,11 @@ public interface OFAgent { Builder controllers(Set controllers); /** - * Returns OF agent builder with the supplied event executor. + * Returns OF agent builder with the supplied state. * - * @param eventExecutor event executor + * @param state state of the agent * @return of agent builder */ - Builder eventExecutor(ExecutorService eventExecutor); - - /** - * Returns OF agent builder with the supplied IO work group. - * - * @param ioWorker io worker group - * @return of agent builder - */ - Builder ioWorker(NioEventLoopGroup ioWorker); + Builder state(State state); } } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentAdminService.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentAdminService.java new file mode 100644 index 0000000000..0c6667afa5 --- /dev/null +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentAdminService.java @@ -0,0 +1,59 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ofagent.api; + +import org.onosproject.incubator.net.virtual.NetworkId; + +/** + * Service for administering the inventory of OpenFlow agents. + */ +public interface OFAgentAdminService { + + /** + * Creates an OpenFlow agent for a given virtual network with given controllers. + * + * @param ofAgent the new ofagent + */ + void createAgent(OFAgent ofAgent); + + /** + * Updates the agent. + * + * @param ofAgent updated ofagent + */ + void updateAgent(OFAgent ofAgent); + + /** + * Removes the OpenFlow agent for the given virtual network. + * + * @param networkId virtual network identifier + */ + void removeAgent(NetworkId networkId); + + /** + * Starts the agent for the given network. + * + * @param networkId virtual network identifier + */ + void startAgent(NetworkId networkId); + + /** + * Stops the agent for the given network. + * + * @param networkId virtual network identifier + */ + void stopAgent(NetworkId networkId); +} diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentEvent.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentEvent.java new file mode 100644 index 0000000000..7515a90c93 --- /dev/null +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentEvent.java @@ -0,0 +1,91 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ofagent.api; + +import org.onosproject.event.AbstractEvent; + +/** + * Describes OFAgent event. + */ +public class OFAgentEvent extends AbstractEvent { + + private final OFController controller; + + public enum Type { + + /** + * Signifies that a new OFAgent is created. + */ + OFAGENT_CREATED, + + /** + * Signifies that the OFAgent is removed. + */ + OFAGENT_REMOVED, + + /** + * Signifies that the new external controller is added. + */ + OFAGENT_CONTROLLER_ADDED, + + /** + * Signifies that the external controller is removed. + */ + OFAGENT_CONTROLLER_REMOVED, + + /** + * Signifies that the OFAgent is started. + */ + OFAGENT_STARTED, + + /** + * Signifies that the OFAgent is stopped. + */ + OFAGENT_STOPPED, + } + + /** + * Creates an event of a given type for the specified ofagent and the current time. + * + * @param type ofagent event type + * @param ofAgent ofagent instance + */ + public OFAgentEvent(OFAgentEvent.Type type, OFAgent ofAgent) { + super(type, ofAgent); + this.controller = null; + } + + /** + * Creates an event of a given type for the specified ofagent and the updated controller. + * + * @param type ofagent event type + * @param ofAgent ofagent instance + * @param controller updated external controller + */ + public OFAgentEvent(OFAgentEvent.Type type, OFAgent ofAgent, OFController controller) { + super(type, ofAgent); + this.controller = controller; + } + + /** + * Returns the updated controller. + * + * @return updated controller; null if the event is not controller related + */ + public OFController controller() { + return this.controller; + } +} diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentListener.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentListener.java new file mode 100644 index 0000000000..9cc79820ec --- /dev/null +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentListener.java @@ -0,0 +1,24 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ofagent.api; + +import org.onosproject.event.EventListener; + +/** + * Listener for OFAgent events. + */ +public interface OFAgentListener extends EventListener { +} diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentService.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentService.java index c2b5e47e9e..17422e1dd0 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentService.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentService.java @@ -15,6 +15,7 @@ */ package org.onosproject.ofagent.api; +import org.onosproject.event.ListenerService; import org.onosproject.incubator.net.virtual.NetworkId; import java.util.Set; @@ -22,7 +23,9 @@ import java.util.Set; /** * Service for administering OF agents for a virtual network. */ -public interface OFAgentService { +public interface OFAgentService extends ListenerService { + + String APPLICATION_NAME = "org.onosproject.ofagent"; /** * Returns the OpenFlow agent list. @@ -32,39 +35,10 @@ public interface OFAgentService { Set agents(); /** - * Creates an OpenFlow agent for a given virtual network with given controllers. - * - * @param networkId id of the virtual network - * @param controllers list of controllers - */ - void createAgent(NetworkId networkId, OFController... controllers); - - /** - * Removes the OpenFlow agent for the given virtual network. - * - * @param networkId virtual network identifier - */ - void removeAgent(NetworkId networkId); - - /** - * Starts the agent for the given network. - * - * @param networkId virtual network identifier - */ - void startAgent(NetworkId networkId); - - /** - * Stops the agent for the given network. - * - * @param networkId virtual network identifier - */ - void stopAgent(NetworkId networkId); - - /** - * Returns if the agent of the given network is active or not. + * Returns the agent for the given network. * * @param networkId network id - * @return true if the agent is active + * @return ofagent; null if no ofagent exists for the network */ - boolean isActive(NetworkId networkId); + OFAgent agent(NetworkId networkId); } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStore.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStore.java new file mode 100644 index 0000000000..aa42716fa2 --- /dev/null +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStore.java @@ -0,0 +1,64 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ofagent.api; + +import org.onosproject.incubator.net.virtual.NetworkId; +import org.onosproject.store.Store; + +import java.util.Set; + +/** + * Manages inventory of OpenFlow agent states; not intended for direct use. + */ +public interface OFAgentStore extends Store { + + /** + * Creates the new openflow agent. + * + * @param ofAgent the new ofagent + */ + void createOfAgent(OFAgent ofAgent); + + /** + * Updates the openflow agent. + * + * @param ofAgent the updated ofagent + */ + void updateOfAgent(OFAgent ofAgent); + + /** + * Removes the openflow agent for the supplied network ID. + * + * @param networkId virtual network identifier + * @return removed agent; null if remove failed + */ + OFAgent removeOfAgent(NetworkId networkId); + + /** + * Returns the openflow agent with the supplied network ID. + * + * @param networkId virtual network identifier + * @return ofagent; null if no ofagent exists for the network + */ + OFAgent ofAgent(NetworkId networkId); + + /** + * Returns all openflow agents. + * + * @return set of ofagents; empty set if no ofagents exist + */ + Set ofAgents(); +} diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStoreDelegate.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStoreDelegate.java new file mode 100644 index 0000000000..16596fc381 --- /dev/null +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFAgentStoreDelegate.java @@ -0,0 +1,24 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ofagent.api; + +import org.onosproject.store.StoreDelegate; + +/** + * OFAgent network store delegate abstraction. + */ +public interface OFAgentStoreDelegate extends StoreDelegate { +} diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java index 8ee6f64617..b8cabf9d03 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java @@ -15,19 +15,19 @@ */ package org.onosproject.ofagent.api; -import org.onosproject.net.Device; +import org.projectfloodlight.openflow.types.DatapathId; /** * Representation of virtual OpenFlow switch. */ -public interface OFSwitch extends OFSwitchService, OFControllerRoleService { +public interface OFSwitch extends OFSwitchOperationService, OFControllerRoleService { /** * Returns the device information. * * @return virtual device */ - Device device(); + DatapathId dpid(); /** * Returns the capabilities of the switch. @@ -35,11 +35,4 @@ public interface OFSwitch extends OFSwitchService, OFControllerRoleService { * @return capabilities */ OFSwitchCapabilities capabilities(); - - /** - * Returns if the switch is connected to controllers or not. - * - * @return true if the switch is connected, false otherwise - */ - boolean isConnected(); } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchOperationService.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchOperationService.java new file mode 100644 index 0000000000..6413cba5ad --- /dev/null +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchOperationService.java @@ -0,0 +1,129 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ofagent.api; + +import io.netty.channel.Channel; +import org.onosproject.net.Port; +import org.onosproject.net.flow.FlowRule; +import org.onosproject.net.packet.InboundPacket; +import org.projectfloodlight.openflow.protocol.OFMessage; + +/** + * Service for providing OpenFlow operations. + */ +public interface OFSwitchOperationService { + + /** + * Processes a new port of the switch. + * It sends out FEATURE_REPLY message to the controllers. + * + * @param port virtual port + */ + void processPortAdded(Port port); + + /** + * Processes port link down. + * It sends out PORT_STATUS asynchronous message to the controllers. + * + * @param port virtual port + */ + void processPortDown(Port port); + + /** + * Processes port link down. + * It sends out PORT_STATUS asynchronous message to the controllers. + * + * @param port virtual port + */ + void processPortUp(Port port); + + /** + * Processes flow removed. + * It sends out FLOW_REMOVED asynchronous message to the controllers. + * + * @param flowRule removed flow rule + */ + void processFlowRemoved(FlowRule flowRule); + + /** + * Processes packet in. + * It sends out PACKET_IN asynchronous message to the controllers. + * + * @param packet inbound packet + */ + void processPacketIn(InboundPacket packet); + + /** + * Processes commands from the controllers that modify the state of the switch. + * Possible message types include PACKET_OUT, FLOW_MOD, GROUP_MOD, + * PORT_MOD, TABLE_MOD. These types of messages can be denied based on a + * role of the request controller. + * + * @param channel received channel + * @param msg command message received + */ + void processControllerCommand(Channel channel, OFMessage msg); + + /** + * Processes a stats request from the controllers. + * Targeted message type is MULTIPART_REQUEST with FLOW, PORT, GROUP, + * GROUP_DESC subtypes. + * + * @param channel received channel + * @param msg stats request message received + */ + void processStatsRequest(Channel channel, OFMessage msg); + + /** + * Processes a role request from the controllers. + * Targeted message type is ROLE_REQUEST. + * + * @param channel received channel + * @param msg role request message received + */ + void processRoleRequest(Channel channel, OFMessage msg); + + /** + * Processes a features request from the controllers. + * + * @param channel received channel + * @param msg received features request + */ + void processFeaturesRequest(Channel channel, OFMessage msg); + + /** + * Processes LLDP packets from the controller. + * + * @param channel received channel + * @param msg packet out message with lldp + */ + void processLldp(Channel channel, OFMessage msg); + + /** + * Sends hello to the controller. + * + * @param channel received channel + */ + void sendOfHello(Channel channel); + + /** + * Processes echo request from the controllers. + * + * @param channel received channel + * @param msg echo request message + */ + void processEchoRequest(Channel channel, OFMessage msg); +} diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java index 4f33b516af..898aede2bc 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java @@ -15,125 +15,27 @@ */ package org.onosproject.ofagent.api; -import io.netty.channel.Channel; -import org.onosproject.net.Port; -import org.onosproject.net.flow.FlowRule; -import org.onosproject.net.packet.InboundPacket; -import org.projectfloodlight.openflow.protocol.OFMessage; +import org.onosproject.incubator.net.virtual.NetworkId; + +import java.util.Set; /** - * Service providing OpenFlow switch operations. + * Service for providing virtual OpenFlow switch information. */ public interface OFSwitchService { /** - * Handles the switch starts. - */ - void started(); - - /** - * Handles the switch stops. - */ - void stopped(); - - /** - * Processes a new port of the switch. - * It sends out FEATURE_REPLY message to the controllers. + * Returns all openflow switches that OF agent service manages. * - * @param port virtual port + * @return set of openflow switches; empty set if no openflow switches exist */ - void processPortAdded(Port port); + Set ofSwitches(); /** - * Processes port link down. - * It sends out PORT_STATUS asynchronous message to the controllers. + * Returns all openflow switches for the specified network. * - * @param port virtual port + * @param networkId network id + * @return set of openflow switches; empty set if no devices exist on the network */ - void processPortDown(Port port); - - /** - * Processes port link down. - * It sends out PORT_STATUS asynchronous message to the controllers. - * - * @param port virtual port - */ - void processPortUp(Port port); - - /** - * Processes flow removed. - * It sends out FLOW_REMOVED asynchronous message to the controllers. - * - * @param flowRule removed flow rule - */ - void processFlowRemoved(FlowRule flowRule); - - /** - * Processes packet in. - * It sends out PACKET_IN asynchronous message to the controllers. - * - * @param packet inbound packet - */ - void processPacketIn(InboundPacket packet); - - /** - * Processes commands from the controllers that modify the state of the switch. - * Possible message types include PACKET_OUT, FLOW_MOD, GROUP_MOD, - * PORT_MOD, TABLE_MOD. These types of messages can be denied based on a - * role of the request controller. - * - * @param channel received channel - * @param msg command message received - */ - void processControllerCommand(Channel channel, OFMessage msg); - - /** - * Processes a stats request from the controllers. - * Targeted message type is MULTIPART_REQUEST with FLOW, PORT, GROUP, - * GROUP_DESC subtypes. - * - * @param channel received channel - * @param msg stats request message received - */ - void processStatsRequest(Channel channel, OFMessage msg); - - /** - * Processes a role request from the controllers. - * Targeted message type is ROLE_REQUEST. - * - * @param channel received channel - * @param msg role request message received - */ - void processRoleRequest(Channel channel, OFMessage msg); - - /** - * Processes a features request from the controllers. - * - * @param channel received channel - * @param msg received features request - */ - void processFeaturesRequest(Channel channel, OFMessage msg); - - /** - * Processes LLDP packets from the controller. - * - * @param channel received channel - * @param msg packet out message with lldp - */ - void processLldp(Channel channel, OFMessage msg); - - /** - * Sends hello to the controller. - * - * @param channel received channel - */ - void sendOfHello(Channel channel); - - /** - * Processes echo request from the controllers. - * - * @param channel received channel - * @param msg echo request message - */ - void processEchoRequest(Channel channel, OFMessage msg); + Set ofSwitches(NetworkId networkId); } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java index 6a6ba065f3..cac1e0307c 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java @@ -15,111 +15,134 @@ */ package org.onosproject.ofagent.impl; -import io.netty.channel.nio.NioEventLoopGroup; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableSet; import org.onosproject.incubator.net.virtual.NetworkId; -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.DeviceEvent; -import org.onosproject.net.device.DeviceListener; -import org.onosproject.net.flow.FlowRuleEvent; -import org.onosproject.net.flow.FlowRuleListener; -import org.onosproject.net.packet.PacketContext; -import org.onosproject.net.packet.PacketProcessor; import org.onosproject.ofagent.api.OFAgent; import org.onosproject.ofagent.api.OFController; -import org.onosproject.ofagent.api.OFSwitch; -import java.util.Map; +import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; + +import static com.google.common.base.Preconditions.checkNotNull; /** - * Implementation of OF agent. + * Implementation of OpenFlow agent. */ public final class DefaultOFAgent implements OFAgent { private final NetworkId networkId; - private final Map, Object> services; private final Set controllers; - private final ExecutorService eventExecutor; - private final NioEventLoopGroup ioWorker; - - private final ConcurrentHashMap switchMap = new ConcurrentHashMap<>(); - private final DeviceListener deviceListener = new InternalDeviceListener(); - private final FlowRuleListener flowRuleListener = new InternalFlowRuleListener(); - private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor(); + private final State state; private DefaultOFAgent(NetworkId networkId, - Map, Object> services, Set controllers, - ExecutorService eventExecutor, - NioEventLoopGroup ioWorker) { + State state) { this.networkId = networkId; - this.services = services; this.controllers = controllers; - this.eventExecutor = eventExecutor; - this.ioWorker = ioWorker; + this.state = state; } @Override public NetworkId networkId() { - return null; + return networkId; } @Override public Set controllers() { - return null; + return controllers; } @Override - public void start() { - // TODO add listeners to the services - // TODO connect all virtual devices in this network to the controllers + public State state() { + return state; } @Override - public void stop() { - // TODO remove listeners from the services - // TODO disconnect all active connections + public int hashCode() { + return Objects.hash(networkId); } - private void connect(OFSwitch ofSwitch, OFController controller) { - // TODO connect the switch to the controller + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj instanceof DefaultOFAgent) { + DefaultOFAgent that = (DefaultOFAgent) obj; + if (Objects.equals(networkId, that.networkId)) { + return true; + } + } + return false; } - private void disconnect(OFSwitch ofSwitch, OFController controller) { - // TODO disconnect the controller from the ofSwitch + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("networkId", this.networkId) + .add("controllers", this.controllers) + .add("state", this.state) + .toString(); } - private class InternalFlowRuleListener implements FlowRuleListener { + /** + * Returns new builder instance. + * + * @return default ofagent builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Returns new builder instance from the existing agent. + * + * @param ofAgent the existing agent + * @return default ofagent builder + */ + public static Builder builder(OFAgent ofAgent) { + return new Builder() + .networkId(ofAgent.networkId()) + .controllers(ofAgent.controllers()) + .state(ofAgent.state()); + } + + public static final class Builder implements OFAgent.Builder { + + private NetworkId networkId; + private Set controllers; + private State state; + + private Builder() { + } @Override - public void event(FlowRuleEvent event) { - // TODO handle flow rule event - } - } + public OFAgent build() { + checkNotNull(networkId, "Network ID cannot be null"); + checkNotNull(state, "State cannot be null"); + controllers = controllers == null ? ImmutableSet.of() : controllers; - private class InternalDeviceListener implements DeviceListener { + return new DefaultOFAgent(networkId, controllers, state); + } @Override - public void event(DeviceEvent event) { - // TODO handle device event - // device detected: connect the device to controllers - // device removed: disconnect and remove the switch from the map - // device state available: connect the switch to the controllers - // device state unavailable: disconnect the switch from the controllers - // port added: send out features reply - // port status change + public Builder networkId(NetworkId networkId) { + this.networkId = networkId; + return this; } - } - - private class InternalPacketProcessor implements PacketProcessor { @Override - public void process(PacketContext context) { - // TODO handle packet-in + public Builder controllers(Set controllers) { + this.controllers = controllers; + return this; + } + + @Override + public Builder state(State state) { + this.state = state; + return this; } } - - // TODO implement builder } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java index 32956c4a27..310c5a44b6 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java @@ -15,22 +15,41 @@ */ package org.onosproject.ofagent.impl; +import com.google.common.base.MoreObjects; import org.onlab.packet.IpAddress; import org.onlab.packet.TpPort; import org.onosproject.ofagent.api.OFController; -/** - * Implementation of tenant openflow controller. - */ -public class DefaultOFController implements OFController { - private IpAddress ip; - private TpPort port; +import java.util.Objects; - public DefaultOFController(IpAddress ip, TpPort port) { +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Implementation of the default OpenFlow controller. + */ +public final class DefaultOFController implements OFController { + + private final IpAddress ip; + private final TpPort port; + + private DefaultOFController(IpAddress ip, TpPort port) { this.ip = ip; this.port = port; } + /** + * Returns new OpenFlow controller with the supplied IP and port. + * + * @param ip ip address + * @param port port number + * @return openflow controller + */ + public static DefaultOFController of(IpAddress ip, TpPort port) { + checkNotNull(ip, "Controller IP address cannot be null"); + checkNotNull(port, "Controller port address cannot be null"); + return new DefaultOFController(ip, port); + } + @Override public IpAddress ip() { return ip; @@ -40,4 +59,33 @@ public class DefaultOFController implements OFController { public TpPort port() { return port; } + + @Override + public int hashCode() { + return Objects.hash(ip, port); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj instanceof DefaultOFController) { + DefaultOFController that = (DefaultOFController) obj; + if (Objects.equals(ip, that.ip) && + Objects.equals(port, that.port)) { + return true; + } + } + return false; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("ip", this.ip) + .add("port", this.port) + .toString(); + } } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java index 4258321f95..878fae0e9d 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java @@ -15,10 +15,8 @@ */ package org.onosproject.ofagent.impl; -import com.google.common.collect.Lists; +import com.google.common.collect.ImmutableSet; import io.netty.channel.Channel; -import org.onosproject.net.Device; -import org.onosproject.net.DeviceId; import org.onosproject.net.Port; import org.onosproject.net.flow.FlowRule; import org.onosproject.net.packet.InboundPacket; @@ -35,7 +33,7 @@ import org.projectfloodlight.openflow.protocol.OFMessage; import org.projectfloodlight.openflow.protocol.OFVersion; import org.projectfloodlight.openflow.types.DatapathId; -import java.util.List; +import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -44,7 +42,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.projectfloodlight.openflow.protocol.OFControllerRole.*; /** - * Implementation of OF switch. + * Implementation of the default OpenFlow switch. */ public final class DefaultOFSwitch implements OFSwitch { @@ -53,49 +51,34 @@ public final class DefaultOFSwitch implements OFSwitch { private static final long NUM_BUFFERS = 1024; private static final short NUM_TABLES = 3; - private final Device device; + private final DatapathId dpId; private final OFSwitchCapabilities capabilities; - private final DatapathId datapathId; private final ConcurrentHashMap controllerRoleMap = new ConcurrentHashMap<>(); + private static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13); - protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13); - private int handshakeTransactionIds; + private int handshakeTransactionIds = -1; - public DefaultOFSwitch(Device device, OFSwitchCapabilities capabilities) { - this.device = device; + private DefaultOFSwitch(DatapathId dpid, OFSwitchCapabilities capabilities) { + this.dpId = dpid; this.capabilities = capabilities; - datapathId = getDpidFromDeviceId(device.id()); - handshakeTransactionIds = -1; - } - // TODO add builder + public static DefaultOFSwitch of(DatapathId dpid, OFSwitchCapabilities capabilities) { + checkNotNull(dpid, "DPID cannot be null"); + checkNotNull(capabilities, "OF capabilities cannot be null"); + return new DefaultOFSwitch(dpid, capabilities); + } @Override - public Device device() { - return device; + public DatapathId dpid() { + return this.dpId; } @Override public OFSwitchCapabilities capabilities() { - return capabilities; - } - - @Override - public boolean isConnected() { - return !controllerChannels().isEmpty(); - } - - @Override - public void started() { - // TODO do some initial setups - } - - @Override - public void stopped() { - // TODO implement + return this.capabilities; } @Override @@ -136,7 +119,7 @@ public final class DefaultOFSwitch implements OFSwitch { @Override public Set controllerChannels() { - return null; + return ImmutableSet.copyOf(controllerRoleMap.keySet()); } @Override @@ -181,19 +164,14 @@ public final class DefaultOFSwitch implements OFSwitch { @Override public void processFeaturesRequest(Channel channel, OFMessage msg) { - // TODO process features request and send reply - List ofMessageList = Lists.newArrayList(); - - OFFeaturesReply.Builder frBuilder = FACTORY.buildFeaturesReply() - .setDatapathId(datapathId) + OFFeaturesReply ofFeaturesReply = FACTORY.buildFeaturesReply() + .setDatapathId(dpId) .setNBuffers(NUM_BUFFERS) .setNTables(NUM_TABLES) .setCapabilities(capabilities.ofSwitchCapabilities()) - .setXid(msg.getXid()); - - ofMessageList.add(frBuilder.build()); - channel.write(ofMessageList); - + .setXid(msg.getXid()) + .build(); + channel.writeAndFlush(Collections.singletonList(ofFeaturesReply)); } @Override @@ -203,38 +181,18 @@ public final class DefaultOFSwitch implements OFSwitch { @Override public void sendOfHello(Channel channel) { - List ofMessageList = Lists.newArrayList(); - OFHello.Builder ofHello = FACTORY.buildHello() - .setXid(this.handshakeTransactionIds--); - - ofMessageList.add(ofHello.build()); - channel.write(ofMessageList); + OFHello ofHello = FACTORY.buildHello() + .setXid(this.handshakeTransactionIds--) + .build(); + channel.writeAndFlush(Collections.singletonList(ofHello)); } @Override public void processEchoRequest(Channel channel, OFMessage msg) { - List ofMessageList = Lists.newArrayList(); - OFEchoReply.Builder echoBuilder = FACTORY.buildEchoReply() + OFEchoReply ofEchoReply = FACTORY.buildEchoReply() .setXid(msg.getXid()) - .setData(((OFEchoRequest) msg).getData()); - - ofMessageList.add(echoBuilder.build()); - channel.write(ofMessageList); - } - - private DatapathId getDpidFromDeviceId(DeviceId deviceId) { - String deviceIdToString = deviceId.toString().split(":")[1]; - - assert (deviceIdToString.length() == 16); - - String resultedHexString = new String(); - for (int i = 0; i < 8; i++) { - resultedHexString = resultedHexString + deviceIdToString.charAt(2 * i) - + deviceIdToString.charAt(2 * i + 1); - if (i != 7) { - resultedHexString += ":"; - } - } - return DatapathId.of(resultedHexString); + .setData(((OFEchoRequest) msg).getData()) + .build(); + channel.writeAndFlush(Collections.singletonList(ofEchoReply)); } } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DistributedOFAgentStore.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DistributedOFAgentStore.java new file mode 100644 index 0000000000..9f1f88fa87 --- /dev/null +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DistributedOFAgentStore.java @@ -0,0 +1,208 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ofagent.impl; + +import com.google.common.collect.ImmutableSet; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.KryoNamespace; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.CoreService; +import org.onosproject.incubator.net.virtual.NetworkId; +import org.onosproject.ofagent.api.OFAgent; +import org.onosproject.ofagent.api.OFAgentEvent; +import org.onosproject.ofagent.api.OFAgentEvent.Type; +import org.onosproject.ofagent.api.OFAgentStore; +import org.onosproject.ofagent.api.OFAgentStoreDelegate; +import org.onosproject.ofagent.api.OFController; +import org.onosproject.store.AbstractStore; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.MapEvent; +import org.onosproject.store.service.MapEventListener; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; + +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.ofagent.api.OFAgent.State.STARTED; +import static org.onosproject.ofagent.api.OFAgentEvent.Type.*; +import static org.onosproject.ofagent.api.OFAgentService.APPLICATION_NAME; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Implementation of the {@link OFAgentStore} with consistent map. + */ +@Service +@Component(immediate = true) +public class DistributedOFAgentStore extends AbstractStore + implements OFAgentStore { + + private final Logger log = getLogger(getClass()); + + private static final String ERR_NOT_FOUND = " does not exist"; + private static final String ERR_DUPLICATE = " already exists"; + + private static final KryoNamespace SERIALIZER_OFAGENT = KryoNamespace.newBuilder() + .register(KryoNamespaces.API) + .register(OFAgent.class) + .register(OFAgent.State.class) + .register(NetworkId.class) + .register(DefaultOFAgent.class) + .register(OFController.class) + .register(DefaultOFController.class) + .build(); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected CoreService coreService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + private final ExecutorService eventExecutor = newSingleThreadExecutor( + groupedThreads(this.getClass().getSimpleName(), "event-handler", log)); + private final MapEventListener ofAgentMapListener = new OFAgentMapListener(); + + private ConsistentMap ofAgentStore; + + @Activate + protected void activate() { + ApplicationId appId = coreService.registerApplication(APPLICATION_NAME); + ofAgentStore = storageService.consistentMapBuilder() + .withSerializer(Serializer.using(SERIALIZER_OFAGENT)) + .withName("ofagentstore") + .withApplicationId(appId) + .build(); + ofAgentStore.addListener(ofAgentMapListener); + + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + ofAgentStore.removeListener(ofAgentMapListener); + eventExecutor.shutdown(); + + log.info("Stopped"); + } + + @Override + public void createOfAgent(OFAgent ofAgent) { + ofAgentStore.compute(ofAgent.networkId(), (id, existing) -> { + final String error = ofAgent.networkId() + ERR_DUPLICATE; + checkArgument(existing == null, error); + return ofAgent; + }); + } + + @Override + public void updateOfAgent(OFAgent ofAgent) { + ofAgentStore.compute(ofAgent.networkId(), (id, existing) -> { + final String error = ofAgent.networkId() + ERR_NOT_FOUND; + checkArgument(existing != null, error); + return ofAgent; + }); + } + + @Override + public OFAgent removeOfAgent(NetworkId networkId) { + Versioned ofAgent = ofAgentStore.remove(networkId); + return ofAgent == null ? null : ofAgent.value(); + } + + @Override + public OFAgent ofAgent(NetworkId networkId) { + Versioned ofAgent = ofAgentStore.get(networkId); + return ofAgent == null ? null : ofAgent.value(); + } + + @Override + public Set ofAgents() { + Set ofAgents = ofAgentStore.values().stream() + .map(Versioned::value) + .collect(Collectors.toSet()); + return ImmutableSet.copyOf(ofAgents); + } + + private class OFAgentMapListener implements MapEventListener { + + @Override + public void event(MapEvent event) { + switch (event.type()) { + case INSERT: + eventExecutor.execute(() -> { + log.debug("OFAgent for network {} created", event.key()); + notifyDelegate(new OFAgentEvent( + Type.OFAGENT_CREATED, + event.newValue().value())); + }); + break; + case UPDATE: + eventExecutor.execute(() -> { + log.debug("OFAgent for network {} updated", event.key()); + processUpdated(event.oldValue().value(), event.newValue().value()); + }); + break; + case REMOVE: + eventExecutor.execute(() -> { + log.debug("OFAgent for network {} removed", event.key()); + notifyDelegate(new OFAgentEvent( + Type.OFAGENT_REMOVED, + event.oldValue().value())); + }); + break; + default: + break; + } + } + + private void processUpdated(OFAgent oldValue, OFAgent newValue) { + if (!oldValue.controllers().equals(newValue.controllers())) { + oldValue.controllers().stream() + .filter(controller -> !newValue.controllers().contains(controller)) + .forEach(controller -> notifyDelegate(new OFAgentEvent( + OFAGENT_CONTROLLER_REMOVED, + newValue, + controller) + )); + + newValue.controllers().stream() + .filter(controller -> !oldValue.controllers().contains(controller)) + .forEach(controller -> notifyDelegate(new OFAgentEvent( + OFAGENT_CONTROLLER_ADDED, + newValue, + controller + ))); + } + + if (oldValue.state() != newValue.state()) { + Type eventType = newValue.state() == STARTED ? OFAGENT_STARTED : OFAGENT_STOPPED; + notifyDelegate(new OFAgentEvent(eventType, newValue)); + } + } + } +} diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java index 3005f736fa..ce7c30fa62 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java @@ -15,99 +15,258 @@ */ package org.onosproject.ofagent.impl; -import io.netty.channel.nio.NioEventLoopGroup; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.LeadershipEvent; +import org.onosproject.cluster.LeadershipEventListener; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.CoreService; +import org.onosproject.event.ListenerRegistry; import org.onosproject.incubator.net.virtual.NetworkId; import org.onosproject.incubator.net.virtual.VirtualNetworkEvent; import org.onosproject.incubator.net.virtual.VirtualNetworkListener; +import org.onosproject.incubator.net.virtual.VirtualNetworkService; import org.onosproject.ofagent.api.OFAgent; +import org.onosproject.ofagent.api.OFAgentAdminService; +import org.onosproject.ofagent.api.OFAgentEvent; +import org.onosproject.ofagent.api.OFAgentListener; import org.onosproject.ofagent.api.OFAgentService; -import org.onosproject.ofagent.api.OFController; +import org.onosproject.ofagent.api.OFAgentStore; +import org.onosproject.ofagent.api.OFAgentStoreDelegate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import static org.onlab.util.BoundedThreadPool.newFixedThreadPool; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor; import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.ofagent.api.OFAgent.State.STARTED; +import static org.onosproject.ofagent.api.OFAgent.State.STOPPED; /** * Implementation of OpenFlow agent service. */ @Component(immediate = true) @Service -public class OFAgentManager implements OFAgentService { +public class OFAgentManager extends ListenerRegistry + implements OFAgentService, OFAgentAdminService { private final Logger log = LoggerFactory.getLogger(getClass()); - // TODO make it to be configurable with component config - private static final int NUM_OF_THREADS = 1; - private final ExecutorService eventExecutor = newFixedThreadPool( - NUM_OF_THREADS, - groupedThreads(this.getClass().getSimpleName(), "event-handler", log)); + private static final String MSG_OFAGENT = "OFAgent for network %s %s"; + private static final String MSG_CREATED = "created"; + private static final String MSG_UPDATED = "updated"; + private static final String MSG_REMOVED = "removed"; + private static final String MSG_IN_STARTED = "is already in active state, do nothing"; + private static final String MSG_IN_STOPPED = "is already in inactive state, do nothing"; - // TODO change it to ConsistentMap and support multi-instance mode - private ConcurrentHashMap agentMap = new ConcurrentHashMap<>(); - private NioEventLoopGroup ioWorker; + private static final String ERR_NULL_OFAGENT = "OFAgent cannot be null"; + private static final String ERR_NULL_NETID = "Network ID cannot be null"; + private static final String ERR_NOT_EXIST = "does not exist"; + private static final String ERR_IN_USE = "is in start state, stop the agent first"; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected CoreService coreService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected LeadershipService leadershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected VirtualNetworkService virtualNetService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected OFAgentStore ofAgentStore; + + private final ExecutorService eventExecutor = newSingleThreadExecutor( + groupedThreads(this.getClass().getSimpleName(), "event-handler", log)); + private final LeadershipEventListener leadershipListener = new InternalLeadershipListener(); + private final VirtualNetworkListener virtualNetListener = new InternalVirtualNetworkListener(); + private final OFAgentStoreDelegate delegate = new InternalOFAgentStoreDelegate(); + + private ApplicationId appId; + private NodeId localId; @Activate protected void activate() { - // TODO listen to the virtual network event - ioWorker = new NioEventLoopGroup(); + appId = coreService.registerApplication(APPLICATION_NAME); + localId = clusterService.getLocalNode().id(); + leadershipService.runForLeadership(appId.name()); + + ofAgentStore.setDelegate(delegate); + virtualNetService.addListener(virtualNetListener); + leadershipService.addListener(leadershipListener); + log.info("Started"); } @Deactivate protected void deactivate() { - ioWorker.shutdownGracefully(); + leadershipService.removeListener(leadershipListener); + virtualNetService.removeListener(virtualNetListener); + ofAgentStore.unsetDelegate(delegate); + ofAgentStore.ofAgents().forEach(ofAgent -> stopAgent(ofAgent.networkId())); + eventExecutor.shutdown(); + leadershipService.withdraw(appId.name()); + log.info("Stopped"); } @Override - public Set agents() { - // TODO return existing agents - return null; + public void createAgent(OFAgent ofAgent) { + checkNotNull(ofAgent, ERR_NULL_OFAGENT); + if (ofAgent.state() == STARTED) { + log.warn(String.format(MSG_OFAGENT, ofAgent.networkId(), ERR_IN_USE)); + return; + } + ofAgentStore.createOfAgent(ofAgent); + log.info(String.format(MSG_OFAGENT, ofAgent.networkId(), MSG_CREATED)); } @Override - public void createAgent(NetworkId networkId, OFController... controllers) { - // TODO create OFAgent instance with the given network ID, controllers - // TODO and device, flowRule, link, and packet service for the virtual network - // TODO start the OFAgent only if the virtual network state is active + public void updateAgent(OFAgent ofAgent) { + checkNotNull(ofAgent, ERR_NULL_OFAGENT); + ofAgentStore.updateOfAgent(ofAgent); + log.info(String.format(MSG_OFAGENT, ofAgent.networkId(), MSG_UPDATED)); } @Override public void removeAgent(NetworkId networkId) { - // TODO stop and remove the OFAgent for the network + checkNotNull(networkId, ERR_NULL_NETID); + synchronized (this) { + OFAgent existing = ofAgentStore.ofAgent(networkId); + if (existing == null) { + final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST); + throw new IllegalStateException(error); + } + if (existing.state() == STARTED) { + final String error = String.format(MSG_OFAGENT, networkId, ERR_IN_USE); + throw new IllegalStateException(error); + } + ofAgentStore.removeOfAgent(networkId); + log.info(String.format(MSG_OFAGENT, networkId, MSG_REMOVED)); + } } @Override public void startAgent(NetworkId networkId) { - // TODO starts the agent for the network + checkNotNull(networkId, ERR_NULL_NETID); + synchronized (this) { + OFAgent existing = ofAgentStore.ofAgent(networkId); + if (existing == null) { + final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST); + throw new IllegalStateException(error); + } + if (existing.state() == STARTED) { + log.warn(String.format(MSG_OFAGENT, networkId, MSG_IN_STARTED)); + return; + } + OFAgent updated = DefaultOFAgent.builder(existing).state(STARTED).build(); + ofAgentStore.updateOfAgent(updated); + } } @Override public void stopAgent(NetworkId networkId) { - // TODO stops the agent for the network + checkNotNull(networkId, ERR_NULL_NETID); + synchronized (this) { + OFAgent existing = ofAgentStore.ofAgent(networkId); + if (existing == null) { + final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST); + throw new IllegalStateException(error); + } + if (existing.state() == STOPPED) { + log.warn(String.format(MSG_OFAGENT, networkId, MSG_IN_STOPPED)); + return; + } + OFAgent updated = DefaultOFAgent.builder(existing).state(STOPPED).build(); + ofAgentStore.updateOfAgent(updated); + } } @Override - public boolean isActive(NetworkId networkId) { - // TODO manage the OF agent status - return false; + public Set agents() { + return ofAgentStore.ofAgents(); + } + + @Override + public OFAgent agent(NetworkId networkId) { + checkNotNull(networkId, ERR_NULL_NETID); + return ofAgentStore.ofAgent(networkId); + } + + private class InternalLeadershipListener implements LeadershipEventListener { + + @Override + public boolean isRelevant(LeadershipEvent event) { + // TODO check if local node is relevant to the leadership change event + return false; + } + + @Override + public void event(LeadershipEvent event) { + switch (event.type()) { + case LEADER_CHANGED: + case LEADER_AND_CANDIDATES_CHANGED: + // TODO handle leadership changed events -> restart agents? + default: + break; + } + } } private class InternalVirtualNetworkListener implements VirtualNetworkListener { + @Override + public boolean isRelevant(VirtualNetworkEvent event) { + // do not allow without leadership + return Objects.equals(localId, leadershipService.getLeader(appId.name())); + } + @Override public void event(VirtualNetworkEvent event) { - // TODO handle virtual network start and stop + switch (event.type()) { + case NETWORK_UPDATED: + // TODO handle virtual network stopped -> stop agent + break; + case NETWORK_REMOVED: + // TODO remove related OFAgent -> stop agent + break; + case NETWORK_ADDED: + case VIRTUAL_DEVICE_ADDED: + case VIRTUAL_DEVICE_UPDATED: + case VIRTUAL_DEVICE_REMOVED: + case VIRTUAL_PORT_ADDED: + case VIRTUAL_PORT_UPDATED: + case VIRTUAL_PORT_REMOVED: + default: + // do nothing + break; + } + } + } + + private class InternalOFAgentStoreDelegate implements OFAgentStoreDelegate { + + @Override + public void notify(OFAgentEvent event) { + if (event != null) { + log.trace("send ofagent event {}", event); + process(event); + } } } } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java index 3853073710..6514b3ce41 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java @@ -15,39 +15,35 @@ */ package org.onosproject.ofagent.impl; +import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.ReadTimeoutException; -import org.onlab.osgi.DefaultServiceDirectory; -import org.onlab.osgi.ServiceDirectory; -import org.onosproject.incubator.net.virtual.VirtualNetworkService; import org.onosproject.ofagent.api.OFSwitch; import org.projectfloodlight.openflow.protocol.OFErrorMsg; -import org.projectfloodlight.openflow.protocol.OFFactories; -import org.projectfloodlight.openflow.protocol.OFFactory; import org.projectfloodlight.openflow.protocol.OFMessage; -import org.projectfloodlight.openflow.protocol.OFVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.channels.ClosedChannelException; -import java.util.List; import java.util.concurrent.RejectedExecutionException; +import static org.onosproject.ofagent.impl.OFChannelHandler.ChannelState.INIT; + /** * Implementation of OpenFlow channel handler. * It processes OpenFlow message according to the channel state. */ public final class OFChannelHandler extends ChannelDuplexHandler { + private static final String MSG_CHANNEL_STATE = "Set channel(%s) state: %s"; + private final Logger log = LoggerFactory.getLogger(getClass()); private final OFSwitch ofSwitch; - private ChannelHandlerContext ctx; + private Channel channel; private ChannelState state; - protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13); - protected VirtualNetworkService vNetService; enum ChannelState { @@ -62,7 +58,6 @@ public final class OFChannelHandler extends ChannelDuplexHandler { @Override void processOFMessage(final OFChannelHandler handler, final OFMessage msg) { - switch (msg.getType()) { case HELLO: handler.setState(ChannelState.WAIT_FEATURE_REQUEST); @@ -77,17 +72,16 @@ public final class OFChannelHandler extends ChannelDuplexHandler { @Override void processOFMessage(final OFChannelHandler handler, final OFMessage msg) { - switch (msg.getType()) { case FEATURES_REQUEST: - handler.ofSwitch.processFeaturesRequest(handler.ctx.channel(), msg); + handler.ofSwitch.processFeaturesRequest(handler.channel, msg); handler.setState(ChannelState.ESTABLISHED); break; case ECHO_REQUEST: - handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg); + handler.ofSwitch.processEchoRequest(handler.channel, msg); break; case ERROR: - handler.logErrorClose(handler.ctx, (OFErrorMsg) msg); + handler.logErrorClose((OFErrorMsg) msg); break; default: handler.illegalMessageReceived(msg); @@ -117,10 +111,10 @@ public final class OFChannelHandler extends ChannelDuplexHandler { //TODO implement break; case ECHO_REQUEST: - handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg); + handler.ofSwitch.processEchoRequest(handler.channel, msg); break; case ERROR: - handler.logErrorClose(handler.ctx, (OFErrorMsg) msg); + handler.logErrorClose((OFErrorMsg) msg); break; default: handler.unhandledMessageReceived(msg); @@ -128,8 +122,8 @@ public final class OFChannelHandler extends ChannelDuplexHandler { } } }; - abstract void processOFMessage(final OFChannelHandler handler, - final OFMessage msg); + + abstract void processOFMessage(final OFChannelHandler handler, final OFMessage msg); } /** @@ -140,47 +134,36 @@ public final class OFChannelHandler extends ChannelDuplexHandler { public OFChannelHandler(OFSwitch ofSwitch) { super(); this.ofSwitch = ofSwitch; - - setState(ChannelState.INIT); - - ServiceDirectory services = new DefaultServiceDirectory(); - vNetService = services.get(VirtualNetworkService.class); + setState(INIT); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; - log.debug("Channel Active. Send OF_13 Hello to {}", ctx.channel().remoteAddress()); - + this.channel = ctx.channel(); + // FIXME move this to channel handler and add channel when OF handshake is done + ofSwitch.addControllerChannel(channel); try { - ofSwitch.sendOfHello(ctx.channel()); + ofSwitch.sendOfHello(channel); + log.trace("Send OF_13 Hello to {}", channel.remoteAddress()); setState(ChannelState.WAIT_HELLO); - } catch (Throwable cause) { - log.error("Exception occured because of{}", cause.getMessage()); + } catch (Exception ex) { + log.error("Failed sending OF_13 Hello to {} for {}", channel.remoteAddress(), ex.getMessage()); } } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { + public void channelInactive(ChannelHandlerContext ctx) { + ofSwitch.deleteControllerChannel(channel); + log.info("Device {} disconnected from controller {}", ofSwitch.dpid(), channel.remoteAddress()); + } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { - if (msg instanceof List) { - ((List) msg).forEach(ofm -> { - state.processOFMessage(this, (OFMessage) ofm); - }); - } else { - state.processOFMessage(this, (OFMessage) msg); - } - } catch (Throwable cause) { - log.error("Exception occured {}", cause.getMessage()); + state.processOFMessage(this, (OFMessage) msg); + } catch (Exception ex) { + ctx.fireExceptionCaught(ex); } - - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) - throws Exception { } @Override @@ -188,41 +171,40 @@ public final class OFChannelHandler extends ChannelDuplexHandler { if (cause instanceof ReadTimeoutException) { log.error("Connection closed because of ReadTimeoutException {}", cause.getMessage()); } else if (cause instanceof ClosedChannelException) { - log.error("ClosedChannelException occured"); + log.error("ClosedChannelException occurred"); return; } else if (cause instanceof RejectedExecutionException) { log.error("Could not process message: queue full"); } else if (cause instanceof IOException) { - log.error("IOException occured"); + log.error("IOException occurred"); } else { log.error("Error while processing message from switch {}", cause.getMessage()); } - ctx.close(); + channel.close(); } private void setState(ChannelState state) { this.state = state; + if (state != INIT) { + log.debug(String.format(MSG_CHANNEL_STATE, channel.remoteAddress(), state.name())); + } } - private void logErrorClose(ChannelHandlerContext ctx, OFErrorMsg errorMsg) { + private void logErrorClose(OFErrorMsg errorMsg) { log.error("{} from switch {} in state {}", errorMsg, - ofSwitch.device().id().toString(), + ofSwitch.dpid(), state); - - log.error("Disconnecting..."); - ctx.close(); + channel.close(); } private void illegalMessageReceived(OFMessage ofMessage) { log.warn("Controller should never send this message {} in current state {}", - ofMessage.getType().toString(), - state); + ofMessage.getType(), state); } private void unhandledMessageReceived(OFMessage ofMessage) { - log.warn("Unhandled message {} received in state {}. Ignored", - ofMessage.getType().toString(), - state); + log.warn("Unexpected message {} received in state {}", + ofMessage.getType(), state); } } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java index e93b0c9416..58df71cf41 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java @@ -39,7 +39,6 @@ public final class OFChannelInitializer extends ChannelInitializer MAX_RETRY) { + log.warn(String.format(MSG_STATE, + ofSwitch.dpid(), + MSG_FAILED, + controller.ip(), + controller.port())); + } else { this.connect(); } } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java index 7e3d1d4378..0e07d41bfb 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java @@ -32,22 +32,20 @@ import java.util.List; public final class OFMessageDecoder extends ByteToMessageDecoder { private final Logger log = LoggerFactory.getLogger(getClass()); + private final OFMessageReader reader = OFFactories.getGenericReader(); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - if (!ctx.channel().isActive()) { return; } try { - OFMessageReader reader = OFFactories.getGenericReader(); OFMessage message = reader.readFrom(in); out.add(message); } catch (Throwable cause) { - log.error("Exception occured while processing decoding because of {}", cause.getMessage()); + log.error("Failed decode OF message for {}", cause.getMessage()); } - } } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java index 3d9f8eeaa0..67d0ccc783 100644 --- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java @@ -16,50 +16,24 @@ package org.onosproject.ofagent.impl; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.EncoderException; import io.netty.handler.codec.MessageToByteEncoder; import org.projectfloodlight.openflow.protocol.OFMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Encodes OFMessage to a byte buffer. */ public final class OFMessageEncoder extends MessageToByteEncoder> { - private final Logger log = LoggerFactory.getLogger(getClass()); @Override protected void encode(ChannelHandlerContext ctx, Iterable msgList, ByteBuf out) throws Exception { - if (!ctx.channel().isActive()) { return; } - if (msgList instanceof Iterable) { - msgList.forEach(msg -> { - try { - ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(); - msg.writeTo(byteBuf); - - ctx.writeAndFlush(byteBuf); - } catch (Exception e) { - log.error("error occured because of {}", e.getMessage()); - } - }); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - if (cause instanceof EncoderException) { - log.error("Connection closed because of EncoderException {}", cause.getMessage()); - ctx.close(); - } else { - log.error("Exception occured while processing encoding because of {}", cause.getMessage()); - ctx.close(); + for (OFMessage ofm : msgList) { + ofm.writeTo(out); } } } diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFSwitchManager.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFSwitchManager.java new file mode 100644 index 0000000000..f522d68bb9 --- /dev/null +++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFSwitchManager.java @@ -0,0 +1,372 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ofagent.impl; + +import com.google.common.collect.ImmutableSet; +import io.netty.channel.ChannelOutboundInvoker; +import io.netty.channel.nio.NioEventLoopGroup; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.Tools; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.CoreService; +import org.onosproject.incubator.net.virtual.NetworkId; +import org.onosproject.incubator.net.virtual.VirtualNetworkService; +import org.onosproject.net.Device; +import org.onosproject.net.DeviceId; +import org.onosproject.net.device.DeviceEvent; +import org.onosproject.net.device.DeviceListener; +import org.onosproject.net.device.DeviceService; +import org.onosproject.net.flow.FlowRuleEvent; +import org.onosproject.net.flow.FlowRuleListener; +import org.onosproject.net.flow.FlowRuleService; +import org.onosproject.net.packet.PacketContext; +import org.onosproject.net.packet.PacketProcessor; +import org.onosproject.net.packet.PacketService; +import org.onosproject.ofagent.api.OFAgent; +import org.onosproject.ofagent.api.OFAgentEvent; +import org.onosproject.ofagent.api.OFAgentListener; +import org.onosproject.ofagent.api.OFAgentService; +import org.onosproject.ofagent.api.OFController; +import org.onosproject.ofagent.api.OFSwitch; +import org.onosproject.ofagent.api.OFSwitchCapabilities; +import org.onosproject.ofagent.api.OFSwitchService; +import org.projectfloodlight.openflow.types.DatapathId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor; +import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.ofagent.api.OFAgentService.APPLICATION_NAME; + +/** + * Manages OF switches. + */ +@Component(immediate = true) +@Service +public class OFSwitchManager implements OFSwitchService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private static final OFSwitchCapabilities DEFAULT_CAPABILITIES = DefaultOFSwitchCapabilities.builder() + .flowStats() + .tableStats() + .portStats() + .groupStats() + .queueStats() + .ipReasm() + .portBlocked() + .build(); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected CoreService coreService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected LeadershipService leadershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected VirtualNetworkService virtualNetService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected OFAgentService ofAgentService; + + private final ConcurrentHashMap ofSwitchMap = new ConcurrentHashMap<>(); + private final ExecutorService eventExecutor = newSingleThreadExecutor( + groupedThreads(this.getClass().getSimpleName(), "event-handler", log)); + private final OFAgentListener ofAgentListener = new InternalOFAgentListener(); + private final DeviceListener deviceListener = new InternalDeviceListener(); + private final FlowRuleListener flowRuleListener = new InternalFlowRuleListener(); + private final PacketProcessor packetProcessor = new InternalPacketProcessor(); + + private NioEventLoopGroup ioWorker; + private ApplicationId appId; + private NodeId localId; + + @Activate + protected void activate() { + appId = coreService.registerApplication(APPLICATION_NAME); + localId = clusterService.getLocalNode().id(); + ioWorker = new NioEventLoopGroup(); + ofAgentService.addListener(ofAgentListener); + + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + ofAgentService.removeListener(ofAgentListener); + ofAgentService.agents().forEach(this::processOFAgentStopped); + + ioWorker.shutdownGracefully(); + eventExecutor.shutdown(); + + log.info("Stopped"); + } + + @Override + public Set ofSwitches() { + return ImmutableSet.copyOf(ofSwitchMap.values()); + } + + @Override + public Set ofSwitches(NetworkId networkId) { + Set ofSwitches = devices(networkId).stream() + .map(ofSwitchMap::get) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + return ImmutableSet.copyOf(ofSwitches); + } + + private void addOFSwitch(DeviceId deviceId) { + OFSwitch ofSwitch = DefaultOFSwitch.of( + dpidWithDeviceId(deviceId), + DEFAULT_CAPABILITIES); + ofSwitchMap.put(deviceId, ofSwitch); + log.debug("Added virtual OF switch for {}", deviceId); + // TODO connect controllers if the agent is in started state + } + + private void deleteOFSwitch(DeviceId deviceId) { + // TODO disconnect switch if it has active connection + OFSwitch ofSwitch = ofSwitchMap.remove(deviceId); + if (ofSwitch != null) { + log.debug("Removed virtual OFSwitch for {}", deviceId); + } + } + + private void connectController(OFSwitch ofSwitch, Set controllers) { + controllers.forEach(controller -> { + OFConnectionHandler connectionHandler = new OFConnectionHandler( + ofSwitch, + controller, + ioWorker); + connectionHandler.connect(); + }); + log.debug("Connection requested for {}, controller:{}", ofSwitch.dpid(), controllers); + } + + private void disconnectController(OFSwitch ofSwitch, Set controllers) { + Set controllerAddrs = controllers.stream() + .map(ctrl -> new InetSocketAddress(ctrl.ip().toInetAddress(), ctrl.port().toInt())) + .collect(Collectors.toSet()); + + ofSwitch.controllerChannels().stream() + .filter(channel -> controllerAddrs.contains(channel.remoteAddress())) + .forEach(ChannelOutboundInvoker::disconnect); + log.debug("Disconnection requested for {}, controller:{}", ofSwitch.dpid(), controllers); + } + + private Set devices(NetworkId networkId) { + DeviceService deviceService = virtualNetService.get( + networkId, + DeviceService.class); + Set deviceIds = Tools.stream(deviceService.getAvailableDevices()) + .map(Device::id) + .collect(Collectors.toSet()); + return ImmutableSet.copyOf(deviceIds); + } + + private DatapathId dpidWithDeviceId(DeviceId deviceId) { + String strDeviceId = deviceId.toString().split(":")[1]; + checkArgument(strDeviceId.length() == 16, "Invalid device ID " + strDeviceId); + + String resultedHexString = ""; + for (int i = 0; i < 8; i++) { + resultedHexString = resultedHexString + strDeviceId.charAt(2 * i) + + strDeviceId.charAt(2 * i + 1); + if (i != 7) { + resultedHexString += ":"; + } + } + return DatapathId.of(resultedHexString); + } + + private void processOFAgentCreated(OFAgent ofAgent) { + devices(ofAgent.networkId()).forEach(this::addOFSwitch); + DeviceService deviceService = virtualNetService.get( + ofAgent.networkId(), + DeviceService.class); + deviceService.addListener(deviceListener); + } + + private void processOFAgentRemoved(OFAgent ofAgent) { + devices(ofAgent.networkId()).forEach(this::deleteOFSwitch); + DeviceService deviceService = virtualNetService.get( + ofAgent.networkId(), + DeviceService.class); + deviceService.removeListener(deviceListener); + } + + private void processOFAgentStarted(OFAgent ofAgent) { + devices(ofAgent.networkId()).forEach(deviceId -> { + OFSwitch ofSwitch = ofSwitchMap.get(deviceId); + if (ofSwitch != null) { + connectController(ofSwitch, ofAgent.controllers()); + } + }); + + PacketService packetService = virtualNetService.get( + ofAgent.networkId(), + PacketService.class); + packetService.addProcessor(packetProcessor, PacketProcessor.director(0)); + + FlowRuleService flowRuleService = virtualNetService.get( + ofAgent.networkId(), + FlowRuleService.class); + flowRuleService.addListener(flowRuleListener); + } + + private void processOFAgentStopped(OFAgent ofAgent) { + devices(ofAgent.networkId()).forEach(deviceId -> { + OFSwitch ofSwitch = ofSwitchMap.get(deviceId); + if (ofSwitch != null) { + disconnectController(ofSwitch, ofAgent.controllers()); + } + }); + + PacketService packetService = virtualNetService.get( + ofAgent.networkId(), + PacketService.class); + packetService.removeProcessor(packetProcessor); + + FlowRuleService flowRuleService = virtualNetService.get( + ofAgent.networkId(), + FlowRuleService.class); + flowRuleService.removeListener(flowRuleListener); + } + + private class InternalOFAgentListener implements OFAgentListener { + + @Override + public boolean isRelevant(OFAgentEvent event) { + return Objects.equals(localId, leadershipService.getLeader(appId.name())); + } + + @Override + public void event(OFAgentEvent event) { + switch (event.type()) { + case OFAGENT_CREATED: + eventExecutor.execute(() -> { + OFAgent ofAgent = event.subject(); + log.debug("Processing OFAgent created: {}", ofAgent); + processOFAgentCreated(ofAgent); + }); + break; + case OFAGENT_REMOVED: + eventExecutor.execute(() -> { + OFAgent ofAgent = event.subject(); + log.debug("Processing OFAgent removed: {}", ofAgent); + processOFAgentRemoved(ofAgent); + }); + break; + case OFAGENT_CONTROLLER_ADDED: + // TODO handle additional controller + break; + case OFAGENT_CONTROLLER_REMOVED: + // TODO handle removed controller + break; + case OFAGENT_STARTED: + eventExecutor.execute(() -> { + OFAgent ofAgent = event.subject(); + log.debug("Processing OFAgent started: {}", ofAgent); + processOFAgentStarted(ofAgent); + }); + break; + case OFAGENT_STOPPED: + eventExecutor.execute(() -> { + OFAgent ofAgent = event.subject(); + log.debug("Processing OFAgent stopped: {}", ofAgent); + processOFAgentStopped(ofAgent); + }); + break; + default: + // do nothing + break; + } + } + } + + private class InternalDeviceListener implements DeviceListener { + + @Override + public void event(DeviceEvent event) { + switch (event.type()) { + case DEVICE_ADDED: + eventExecutor.execute(() -> { + Device device = event.subject(); + log.debug("Processing device added: {}", device); + addOFSwitch(device.id()); + }); + break; + case DEVICE_REMOVED: + eventExecutor.execute(() -> { + Device device = event.subject(); + log.debug("Processing device added: {}", device); + deleteOFSwitch(device.id()); + }); + break; + case DEVICE_AVAILABILITY_CHANGED: + // TODO handle event + break; + case DEVICE_UPDATED: + case DEVICE_SUSPENDED: + case PORT_ADDED: + // TODO handle event + case PORT_REMOVED: + // TODO handle event + case PORT_STATS_UPDATED: + case PORT_UPDATED: + default: + break; + } + } + } + + private class InternalPacketProcessor implements PacketProcessor { + + @Override + public void process(PacketContext context) { + // TODO handle packet-in + } + } + + private class InternalFlowRuleListener implements FlowRuleListener { + + @Override + public void event(FlowRuleEvent event) { + // TODO handle flow rule event + } + } +} diff --git a/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/DefaultOFAgentTest.java b/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/DefaultOFAgentTest.java new file mode 100644 index 0000000000..52f7f53374 --- /dev/null +++ b/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/DefaultOFAgentTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ofagent.impl; + +import com.google.common.collect.Sets; +import com.google.common.testing.EqualsTester; +import org.junit.Test; +import org.onlab.packet.IpAddress; +import org.onlab.packet.TpPort; +import org.onosproject.incubator.net.virtual.NetworkId; +import org.onosproject.ofagent.api.OFAgent; +import org.onosproject.ofagent.api.OFController; + +import java.util.Set; + +import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable; +import static org.onosproject.ofagent.api.OFAgent.State.STARTED; +import static org.onosproject.ofagent.api.OFAgent.State.STOPPED; + +/** + * Unit test of DefaultOFAgent model entity. + */ +public class DefaultOFAgentTest { + + private static final Set CONTROLLER_1 = Sets.newHashSet( + DefaultOFController.of( + IpAddress.valueOf("192.168.0.3"), + TpPort.tpPort(6653))); + + private static final Set CONTROLLER_2 = Sets.newHashSet( + DefaultOFController.of( + IpAddress.valueOf("192.168.0.3"), + TpPort.tpPort(6653)), + DefaultOFController.of( + IpAddress.valueOf("192.168.0.4"), + TpPort.tpPort(6653))); + + private static final NetworkId NETWORK_1 = NetworkId.networkId(1); + private static final NetworkId NETWORK_2 = NetworkId.networkId(2); + + private static final OFAgent OFAGENT = DefaultOFAgent.builder() + .networkId(NETWORK_1) + .controllers(CONTROLLER_1) + .state(STOPPED) + .build(); + + private static final OFAgent SAME_AS_OFAGENT_1 = DefaultOFAgent.builder() + .networkId(NETWORK_1) + .controllers(CONTROLLER_2) + .state(STOPPED) + .build(); + + private static final OFAgent SAME_AS_OFAGENT_2 = DefaultOFAgent.builder() + .networkId(NETWORK_1) + .controllers(CONTROLLER_1) + .state(STARTED) + .build(); + + private static final OFAgent ANOTHER_OFAGENT = DefaultOFAgent.builder() + .networkId(NETWORK_2) + .controllers(CONTROLLER_1) + .state(STOPPED) + .build(); + + @Test + public void testImmutability() { + assertThatClassIsImmutable(DefaultOFAgent.class); + } + + @Test + public void testEquality() { + new EqualsTester() + .addEqualityGroup(OFAGENT, SAME_AS_OFAGENT_1, SAME_AS_OFAGENT_2) + .addEqualityGroup(ANOTHER_OFAGENT) + .testEquals(); + } +} diff --git a/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/OFAgentManagerTest.java b/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/OFAgentManagerTest.java new file mode 100644 index 0000000000..52b06f480d --- /dev/null +++ b/apps/ofagent/src/test/java/org/onosproject/ofagent/impl/OFAgentManagerTest.java @@ -0,0 +1,266 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.ofagent.impl; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onlab.junit.TestTools; +import org.onlab.junit.TestUtils; +import org.onlab.packet.IpAddress; +import org.onlab.packet.TpPort; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.DefaultControllerNode; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.CoreService; +import org.onosproject.core.DefaultApplicationId; +import org.onosproject.event.Event; +import org.onosproject.incubator.net.virtual.NetworkId; +import org.onosproject.incubator.net.virtual.VirtualNetworkService; +import org.onosproject.ofagent.api.OFAgent; +import org.onosproject.ofagent.api.OFAgentEvent; +import org.onosproject.ofagent.api.OFAgentListener; +import org.onosproject.ofagent.api.OFController; +import org.onosproject.store.service.TestStorageService; + +import java.util.List; +import java.util.Set; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.onosproject.ofagent.api.OFAgent.State.STARTED; +import static org.onosproject.ofagent.api.OFAgent.State.STOPPED; +import static org.onosproject.ofagent.api.OFAgentEvent.Type.*; + +/** + * Junit tests for OFAgent target. + */ +public class OFAgentManagerTest { + + private static final ApplicationId APP_ID = new DefaultApplicationId(1, "test"); + private static final ControllerNode LOCAL_NODE = + new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf("127.0.0.1")); + + private static final Set CONTROLLER_1 = Sets.newHashSet( + DefaultOFController.of( + IpAddress.valueOf("192.168.0.3"), + TpPort.tpPort(6653))); + + private static final Set CONTROLLER_2 = Sets.newHashSet( + DefaultOFController.of( + IpAddress.valueOf("192.168.0.3"), + TpPort.tpPort(6653)), + DefaultOFController.of( + IpAddress.valueOf("192.168.0.4"), + TpPort.tpPort(6653))); + + private static final NetworkId NETWORK_1 = NetworkId.networkId(1); + private static final NetworkId NETWORK_2 = NetworkId.networkId(2); + + private static final OFAgent OFAGENT_1 = DefaultOFAgent.builder() + .networkId(NETWORK_1) + .state(STOPPED) + .build(); + + private static final OFAgent OFAGENT_1_CTRL_1 = DefaultOFAgent.builder() + .networkId(NETWORK_1) + .controllers(CONTROLLER_1) + .state(STOPPED) + .build(); + + private static final OFAgent OFAGENT_1_CTRL_2 = DefaultOFAgent.builder() + .networkId(NETWORK_1) + .controllers(CONTROLLER_2) + .state(STOPPED) + .build(); + + private static final OFAgent OFAGENT_2 = DefaultOFAgent.builder() + .networkId(NETWORK_2) + .state(STOPPED) + .build(); + + private final TestOFAgentListener testListener = new TestOFAgentListener(); + private final CoreService mockCoreService = createMock(CoreService.class); + private final LeadershipService mockLeadershipService = createMock(LeadershipService.class); + private final VirtualNetworkService mockVirtualNetService = createMock(VirtualNetworkService.class); + private final ClusterService mockClusterService = createMock(ClusterService.class); + + private OFAgentManager target; + private DistributedOFAgentStore ofAgentStore; + + @Before + public void setUp() throws Exception { + ofAgentStore = new DistributedOFAgentStore(); + TestUtils.setField(ofAgentStore, "coreService", createMock(CoreService.class)); + TestUtils.setField(ofAgentStore, "storageService", new TestStorageService()); + ofAgentStore.activate(); + + expect(mockCoreService.registerApplication(anyObject())) + .andReturn(APP_ID) + .anyTimes(); + replay(mockCoreService); + + expect(mockClusterService.getLocalNode()) + .andReturn(LOCAL_NODE) + .anyTimes(); + replay(mockClusterService); + + expect(mockLeadershipService.runForLeadership(anyObject())) + .andReturn(null) + .anyTimes(); + mockLeadershipService.addListener(anyObject()); + mockLeadershipService.removeListener(anyObject()); + mockLeadershipService.withdraw(anyObject()); + replay(mockLeadershipService); + + target = new OFAgentManager(); + target.coreService = mockCoreService; + target.leadershipService = mockLeadershipService; + target.virtualNetService = mockVirtualNetService; + target.clusterService = mockClusterService; + target.ofAgentStore = ofAgentStore; + target.addListener(testListener); + target.activate(); + } + + @After + public void tearDown() { + target.removeListener(testListener); + ofAgentStore.deactivate(); + target.deactivate(); + ofAgentStore = null; + target = null; + } + + @Test + public void testCreateAndRemoveAgent() { + target.createAgent(OFAGENT_1); + Set agents = target.agents(); + assertEquals("OFAgent set size did not match", 1, agents.size()); + + target.createAgent(OFAGENT_2); + agents = target.agents(); + assertEquals("OFAgent set size did not match", 2, agents.size()); + + target.removeAgent(NETWORK_1); + agents = target.agents(); + assertEquals("OFAgent set size did not match", 1, agents.size()); + + target.removeAgent(NETWORK_2); + agents = target.agents(); + assertEquals("OFAgent set size did not match", 0, agents.size()); + + validateEvents(OFAGENT_CREATED, OFAGENT_CREATED, OFAGENT_REMOVED, OFAGENT_REMOVED); + } + + @Test(expected = NullPointerException.class) + public void testCreateNullAgent() { + target.createAgent(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateDuplicateAgent() { + target.createAgent(OFAGENT_1); + target.createAgent(OFAGENT_1); + } + + @Test(expected = NullPointerException.class) + public void testRemoveNullAgent() { + target.removeAgent(null); + } + + @Test(expected = IllegalStateException.class) + public void testRemoveNotFoundAgent() { + target.removeAgent(NETWORK_1); + } + + @Test(expected = IllegalStateException.class) + public void testRemoveStartedAgent() { + target.createAgent(OFAGENT_1); + target.startAgent(NETWORK_1); + target.removeAgent(NETWORK_1); + } + + @Test + public void testStartAndStopAgent() { + target.createAgent(OFAGENT_1); + target.startAgent(NETWORK_1); + OFAgent ofAgent = target.agent(NETWORK_1); + assertEquals("OFAgent state did not match", STARTED, ofAgent.state()); + + target.stopAgent(NETWORK_1); + ofAgent = target.agent(NETWORK_1); + assertEquals("OFAgent state did not match", STOPPED, ofAgent.state()); + + validateEvents(OFAGENT_CREATED, OFAGENT_STARTED, OFAGENT_STOPPED); + } + + @Test + public void testAddController() { + target.createAgent(OFAGENT_1); + target.updateAgent(OFAGENT_1_CTRL_1); + OFAgent ofAgent = target.agent(NETWORK_1); + assertEquals("OFAgent controller did not match", CONTROLLER_1, ofAgent.controllers()); + + target.updateAgent(OFAGENT_1_CTRL_2); + ofAgent = target.agent(NETWORK_1); + assertEquals("OFAgent controller did not match", CONTROLLER_2, ofAgent.controllers()); + + validateEvents(OFAGENT_CREATED, OFAGENT_CONTROLLER_ADDED, OFAGENT_CONTROLLER_ADDED); + } + + @Test + public void testRemoveController() { + target.createAgent(OFAGENT_1_CTRL_2); + target.updateAgent(OFAGENT_1_CTRL_1); + OFAgent ofAgent = target.agent(NETWORK_1); + assertEquals("OFAgent controller did not match", CONTROLLER_1, ofAgent.controllers()); + + target.updateAgent(OFAGENT_1); + ofAgent = target.agent(NETWORK_1); + assertTrue("OFAgent controller did not match", ofAgent.controllers().isEmpty()); + + validateEvents(OFAGENT_CREATED, OFAGENT_CONTROLLER_REMOVED, OFAGENT_CONTROLLER_REMOVED); + } + + private void validateEvents(Enum... types) { + TestTools.assertAfter(100, () -> { + int i = 0; + assertEquals("Number of events does not match", types.length, testListener.events.size()); + for (Event event : testListener.events) { + assertEquals("Incorrect event received", types[i], event.type()); + i++; + } + testListener.events.clear(); + }); + } + + private static class TestOFAgentListener implements OFAgentListener { + + private List events = Lists.newArrayList(); + + @Override + public void event(OFAgentEvent event) { + events.add(event); + } + } +}