From f16e8850b3f0ca1ead788aae401d7bcc27d2db61 Mon Sep 17 00:00:00 2001 From: Jian Li Date: Tue, 22 Jan 2019 22:55:31 +0900 Subject: [PATCH] [ONOS-7902] Add node handler and a set of CLIs for kubernetes node Change-Id: Iee4a88e4af437d551a38342de339455387389f61 --- .../onosproject/k8snode/api/Constants.java | 2 +- .../k8snode/api/K8sNodeHandler.java | 2 +- apps/k8s-node/app/BUILD | 7 + .../k8snode/cli/K8sHostnameCompleter.java | 52 ++ .../k8snode/cli/K8sNodeCheckCommand.java | 103 +++ .../k8snode/cli/K8sNodeInitCommand.java | 87 +++ .../k8snode/cli/K8sNodeListCommand.java | 74 ++ .../k8snode/impl/DefaultK8sNodeHandler.java | 674 ++++++++++++++++++ .../onosproject/k8snode/util/K8sNodeUtil.java | 149 ++++ .../main/resources/definitions/K8sNode.json | 2 +- apps/k8s-node/network-cfg.json | 19 + 11 files changed, 1168 insertions(+), 3 deletions(-) create mode 100644 apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sHostnameCompleter.java create mode 100644 apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeCheckCommand.java create mode 100644 apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeInitCommand.java create mode 100644 apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeListCommand.java create mode 100644 apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java create mode 100644 apps/k8s-node/app/src/main/java/org/onosproject/k8snode/util/K8sNodeUtil.java create mode 100644 apps/k8s-node/network-cfg.json diff --git a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/Constants.java b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/Constants.java index 0cded63035..bcc154b70a 100644 --- a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/Constants.java +++ b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/Constants.java @@ -23,7 +23,7 @@ public final class Constants { private Constants() { } - public static final String INTEGRATION_BRIDGE = "br-int"; + public static final String INTEGRATION_BRIDGE = "kbr-int"; public static final String VXLAN_TUNNEL = "vxlan"; public static final String GRE_TUNNEL = "gre"; public static final String GENEVE_TUNNEL = "geneve"; diff --git a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNodeHandler.java b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNodeHandler.java index 3f63ad33d5..e3f371a965 100644 --- a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNodeHandler.java +++ b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNodeHandler.java @@ -47,7 +47,7 @@ public interface K8sNodeHandler { /** * Processes the given node for incomplete state. * - * @param k8sNode kubernete node + * @param k8sNode kubernetes node */ void processIncompleteState(K8sNode k8sNode); } diff --git a/apps/k8s-node/app/BUILD b/apps/k8s-node/app/BUILD index fd859b9510..d740f68f27 100644 --- a/apps/k8s-node/app/BUILD +++ b/apps/k8s-node/app/BUILD @@ -1,5 +1,7 @@ COMPILE_DEPS = CORE_DEPS + JACKSON + KRYO + CLI + REST + [ "//core/store/serializers:onos-core-serializers", + "//protocols/ovsdb/api:onos-protocols-ovsdb-api", + "//protocols/ovsdb/rfc:onos-protocols-ovsdb-rfc", "//apps/k8s-node/api:onos-apps-k8s-node-api", ] @@ -10,7 +12,12 @@ TEST_DEPS = TEST_ADAPTERS + TEST_REST + [ ] osgi_jar_with_tests( + api_description = "REST API for Kubernetes Node", + api_package = "org.onosproject.k8snode.web", + api_title = "Kubernetes Node API", + api_version = "1.0", karaf_command_packages = ["org.onosproject.k8snode.cli"], test_deps = TEST_DEPS, + web_context = "/onos/k8snode", deps = COMPILE_DEPS, ) diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sHostnameCompleter.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sHostnameCompleter.java new file mode 100644 index 0000000000..54c66fa7db --- /dev/null +++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sHostnameCompleter.java @@ -0,0 +1,52 @@ +/* + * Copyright 2019-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.k8snode.cli; + +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.apache.karaf.shell.api.console.CommandLine; +import org.apache.karaf.shell.api.console.Completer; +import org.apache.karaf.shell.api.console.Session; +import org.apache.karaf.shell.support.completers.StringsCompleter; +import org.onosproject.k8snode.api.K8sNode; +import org.onosproject.k8snode.api.K8sNodeService; + +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.stream.Collectors; + +import static org.onosproject.cli.AbstractShellCommand.get; + +/** + * Kubernetes host completer. + */ +@Service +public class K8sHostnameCompleter implements Completer { + @Override + public int complete(Session session, CommandLine commandLine, List candidates) { + StringsCompleter delegate = new StringsCompleter(); + K8sNodeService nodeService = get(K8sNodeService.class); + + Set hostnames = nodeService.nodes().stream() + .map(K8sNode::hostname) + .collect(Collectors.toSet()); + SortedSet strings = delegate.getStrings(); + + strings.addAll(hostnames); + + return delegate.complete(session, commandLine, candidates); + } +} diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeCheckCommand.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeCheckCommand.java new file mode 100644 index 0000000000..9879415dec --- /dev/null +++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeCheckCommand.java @@ -0,0 +1,103 @@ +/* + * Copyright 2019-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.k8snode.cli; + +import org.apache.karaf.shell.api.action.Argument; +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.Completion; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.onosproject.cli.AbstractShellCommand; +import org.onosproject.k8snode.api.K8sNode; +import org.onosproject.k8snode.api.K8sNodeService; +import org.onosproject.net.Device; +import org.onosproject.net.DeviceId; +import org.onosproject.net.Port; +import org.onosproject.net.device.DeviceService; + +import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL; +import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL; +import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE; +import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL; +import static org.onosproject.net.AnnotationKeys.PORT_NAME; + +/** + * Checks detailed node init state. + */ +@Service +@Command(scope = "onos", name = "k8s-node-check", + description = "Shows detailed kubernetes node init state") +public class K8sNodeCheckCommand extends AbstractShellCommand { + + @Argument(index = 0, name = "hostname", description = "Hostname", + required = true, multiValued = false) + @Completion(K8sHostnameCompleter.class) + private String hostname = null; + + private static final String MSG_OK = "OK"; + private static final String MSG_ERROR = "ERROR"; + + @Override + protected void doExecute() { + K8sNodeService nodeService = get(K8sNodeService.class); + DeviceService deviceService = get(DeviceService.class); + + K8sNode node = nodeService.node(hostname); + if (node == null) { + print("Cannot find %s from registered nodes", hostname); + return; + } + + print("[Integration Bridge Status]"); + Device device = deviceService.getDevice(node.intgBridge()); + if (device != null) { + print("%s %s=%s available=%s %s", + deviceService.isAvailable(device.id()) ? MSG_OK : MSG_ERROR, + INTEGRATION_BRIDGE, + device.id(), + deviceService.isAvailable(device.id()), + device.annotations()); + if (node.dataIp() != null) { + printPortState(deviceService, node.intgBridge(), VXLAN_TUNNEL); + printPortState(deviceService, node.intgBridge(), GRE_TUNNEL); + printPortState(deviceService, node.intgBridge(), GENEVE_TUNNEL); + } + } else { + print("%s %s=%s is not available", + MSG_ERROR, + INTEGRATION_BRIDGE, + node.intgBridge()); + } + } + + private void printPortState(DeviceService deviceService, + DeviceId deviceId, String portName) { + Port port = deviceService.getPorts(deviceId).stream() + .filter(p -> p.annotations().value(PORT_NAME).equals(portName) && + p.isEnabled()) + .findAny().orElse(null); + + if (port != null) { + print("%s %s portNum=%s enabled=%s %s", + port.isEnabled() ? MSG_OK : MSG_ERROR, + portName, + port.number(), + port.isEnabled() ? Boolean.TRUE : Boolean.FALSE, + port.annotations()); + } else { + print("%s %s does not exist", MSG_ERROR, portName); + } + } +} diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeInitCommand.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeInitCommand.java new file mode 100644 index 0000000000..a414e15978 --- /dev/null +++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeInitCommand.java @@ -0,0 +1,87 @@ +/* + * Copyright 2019-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.k8snode.cli; + +import org.apache.karaf.shell.api.action.Argument; +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.Completion; +import org.apache.karaf.shell.api.action.Option; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.onosproject.cli.AbstractShellCommand; +import org.onosproject.k8snode.api.K8sNode; +import org.onosproject.k8snode.api.K8sNodeAdminService; +import org.onosproject.k8snode.api.K8sNodeService; + +import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE; +import static org.onosproject.k8snode.api.K8sNodeState.INIT; + +/** + * Initializes nodes for node service. + */ +@Service +@Command(scope = "onos", name = "k8s-node-init", + description = "Initializes nodes for kubernetes node service") +public class K8sNodeInitCommand extends AbstractShellCommand { + + @Option(name = "-a", aliases = "--all", description = "Apply this command to all nodes", + required = false, multiValued = false) + private boolean isAll = false; + + @Option(name = "-i", aliases = "--incomplete", + description = "Apply this command to incomplete nodes", + required = false, multiValued = false) + private boolean isIncomplete = false; + + @Argument(index = 0, name = "hostnames", description = "Hostname(s) to apply this command", + required = false, multiValued = true) + @Completion(K8sHostnameCompleter.class) + private String[] hostnames = null; + + @Override + protected void doExecute() { + K8sNodeService nodeService = get(K8sNodeService.class); + K8sNodeAdminService nodeAdminService = get(K8sNodeAdminService.class); + + if ((!isAll && !isIncomplete && hostnames == null) || + (isAll && isIncomplete) || + (isIncomplete && hostnames != null) || + (hostnames != null && isAll)) { + print("Please specify one of hostname, --all, and --incomplete options."); + return; + } + + if (isAll) { + hostnames = nodeService.nodes().stream() + .map(K8sNode::hostname).toArray(String[]::new); + } else if (isIncomplete) { + hostnames = nodeService.nodes().stream() + .filter(node -> node.state() != COMPLETE) + .map(K8sNode::hostname).toArray(String[]::new); + } + + for (String hostname : hostnames) { + K8sNode node = nodeService.node(hostname); + if (node == null) { + print("Unable to find %s", hostname); + continue; + } + print("Initializing %s", hostname); + K8sNode updated = node.updateState(INIT); + nodeAdminService.updateNode(updated); + } + print("Done."); + } +} diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeListCommand.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeListCommand.java new file mode 100644 index 0000000000..04601cf2ab --- /dev/null +++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeListCommand.java @@ -0,0 +1,74 @@ +/* + * Copyright 2019-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.k8snode.cli; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.collect.Lists; +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.onosproject.cli.AbstractShellCommand; +import org.onosproject.k8snode.api.K8sNode; +import org.onosproject.k8snode.api.K8sNodeService; + +import java.util.Comparator; +import java.util.List; + +import static org.onosproject.k8snode.util.K8sNodeUtil.prettyJson; + +/** + * Lists all nodes registered to the service. + */ +@Service +@Command(scope = "onos", name = "k8s-nodes", + description = "Lists all nodes registered in kubernetes node service") +public class K8sNodeListCommand extends AbstractShellCommand { + + private static final String FORMAT = "%-20s%-15s%-24s%-24s%-20s%-15s"; + + @Override + protected void doExecute() { + K8sNodeService nodeService = get(K8sNodeService.class); + List nodes = Lists.newArrayList(nodeService.nodes()); + nodes.sort(Comparator.comparing(K8sNode::hostname)); + + if (outputJson()) { + print("%s", json(nodes)); + } else { + print(FORMAT, "Hostname", "Type", "Integration Bridge", + "Management IP", "Data IP", "State"); + for (K8sNode node : nodes) { + print(FORMAT, + node.hostname(), + node.type(), + node.intgBridge(), + node.managementIp(), + node.dataIp() != null ? node.dataIp() : "", + node.state()); + } + print("Total %s nodes", nodeService.nodes().size()); + } + } + + private String json(List nodes) { + ObjectMapper mapper = new ObjectMapper(); + ArrayNode result = mapper.createArrayNode(); + for (K8sNode node : nodes) { + result.add(jsonForEntity(node, K8sNode.class)); + } + return prettyJson(mapper, result.toString()); + } +} diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java new file mode 100644 index 0000000000..6fb20b78dd --- /dev/null +++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java @@ -0,0 +1,674 @@ +/* + * Copyright 2019-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.k8snode.impl; + +import org.onlab.util.Tools; +import org.onosproject.cfg.ComponentConfigService; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.CoreService; +import org.onosproject.k8snode.api.K8sNode; +import org.onosproject.k8snode.api.K8sNodeAdminService; +import org.onosproject.k8snode.api.K8sNodeEvent; +import org.onosproject.k8snode.api.K8sNodeHandler; +import org.onosproject.k8snode.api.K8sNodeListener; +import org.onosproject.k8snode.api.K8sNodeService; +import org.onosproject.k8snode.api.K8sNodeState; +import org.onosproject.net.Device; +import org.onosproject.net.DeviceId; +import org.onosproject.net.Port; +import org.onosproject.net.behaviour.BridgeConfig; +import org.onosproject.net.behaviour.BridgeDescription; +import org.onosproject.net.behaviour.ControllerInfo; +import org.onosproject.net.behaviour.DefaultBridgeDescription; +import org.onosproject.net.behaviour.DefaultTunnelDescription; +import org.onosproject.net.behaviour.InterfaceConfig; +import org.onosproject.net.behaviour.TunnelDescription; +import org.onosproject.net.behaviour.TunnelEndPoints; +import org.onosproject.net.behaviour.TunnelKeys; +import org.onosproject.net.device.DeviceAdminService; +import org.onosproject.net.device.DeviceEvent; +import org.onosproject.net.device.DeviceListener; +import org.onosproject.net.device.DeviceService; +import org.onosproject.ovsdb.controller.OvsdbClientService; +import org.onosproject.ovsdb.controller.OvsdbController; +import org.osgi.service.component.ComponentContext; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Modified; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.slf4j.Logger; + +import java.util.Dictionary; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.onlab.packet.TpPort.tpPort; +import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.k8snode.api.Constants.GENEVE; +import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL; +import static org.onosproject.k8snode.api.Constants.GRE; +import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL; +import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE; +import static org.onosproject.k8snode.api.Constants.VXLAN; +import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL; +import static org.onosproject.k8snode.api.K8sNodeService.APP_ID; +import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE; +import static org.onosproject.k8snode.api.K8sNodeState.DEVICE_CREATED; +import static org.onosproject.k8snode.api.K8sNodeState.INCOMPLETE; +import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY; +import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT; +import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT; +import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT; +import static org.onosproject.k8snode.util.K8sNodeUtil.getBooleanProperty; +import static org.onosproject.k8snode.util.K8sNodeUtil.getOvsdbClient; +import static org.onosproject.k8snode.util.K8sNodeUtil.isOvsdbConnected; +import static org.onosproject.net.AnnotationKeys.PORT_NAME; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Service bootstraps kubernetes node based on its type. + */ +@Component(immediate = true, + property = { + OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT, + AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT + } +) +public class DefaultK8sNodeHandler implements K8sNodeHandler { + + private final Logger log = getLogger(getClass()); + + private static final String DEFAULT_OF_PROTO = "tcp"; + private static final int DEFAULT_OFPORT = 6653; + private static final int DPID_BEGIN = 3; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected CoreService coreService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected LeadershipService leadershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected DeviceService deviceService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected DeviceAdminService deviceAdminService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected OvsdbController ovsdbController; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sNodeService k8sNodeService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sNodeAdminService k8sNodeAdminService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected ComponentConfigService componentConfigService; + + /** OVSDB server listen port. */ + private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT; + + /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */ + private boolean autoRecovery = AUTO_RECOVERY_DEFAULT; + + private final ExecutorService eventExecutor = newSingleThreadExecutor( + groupedThreads(this.getClass().getSimpleName(), "event-handler", log)); + + private final DeviceListener ovsdbListener = new InternalOvsdbListener(); + private final DeviceListener bridgeListener = new InternalBridgeListener(); + private final K8sNodeListener k8sNodeListener = new InternalK8sNodeListener(); + + private ApplicationId appId; + private NodeId localNode; + + @Activate + protected void activate() { + appId = coreService.getAppId(APP_ID); + localNode = clusterService.getLocalNode().id(); + + componentConfigService.registerProperties(getClass()); + leadershipService.runForLeadership(appId.name()); + deviceService.addListener(ovsdbListener); + deviceService.addListener(bridgeListener); + k8sNodeService.addListener(k8sNodeListener); + + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + k8sNodeService.removeListener(k8sNodeListener); + deviceService.removeListener(bridgeListener); + deviceService.removeListener(ovsdbListener); + componentConfigService.unregisterProperties(getClass(), false); + leadershipService.withdraw(appId.name()); + eventExecutor.shutdown(); + + log.info("Stopped"); + } + + @Modified + protected void modified(ComponentContext context) { + readComponentConfiguration(context); + + log.info("Modified"); + } + + @Override + public void processInitState(K8sNode k8sNode) { + if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) { + ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum)); + return; + } + if (!deviceService.isAvailable(k8sNode.intgBridge())) { + createBridge(k8sNode, INTEGRATION_BRIDGE, k8sNode.intgBridge()); + } + } + + @Override + public void processDeviceCreatedState(K8sNode k8sNode) { + try { + if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) { + ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum)); + return; + } + + if (k8sNode.dataIp() != null && + !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) { + createVxlanTunnelInterface(k8sNode); + } + + if (k8sNode.dataIp() != null && + !isIntfEnabled(k8sNode, GRE_TUNNEL)) { + createGreTunnelInterface(k8sNode); + } + + if (k8sNode.dataIp() != null && + !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) { + createGeneveTunnelInterface(k8sNode); + } + } catch (Exception e) { + log.error("Exception occurred because of {}", e); + } + } + + @Override + public void processCompleteState(K8sNode k8sNode) { + // do something if needed + } + + @Override + public void processIncompleteState(K8sNode k8sNode) { + // do something if needed + } + + /** + * Extracts properties from the component configuration context. + * + * @param context the component context + */ + private void readComponentConfiguration(ComponentContext context) { + Dictionary properties = context.getProperties(); + + Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT); + if (ovsdbPortConfigured == null) { + ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT; + log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum); + } else { + ovsdbPortNum = ovsdbPortConfigured; + log.info("Configured. OVSDB port is {}", ovsdbPortNum); + } + + Boolean autoRecoveryConfigured = + getBooleanProperty(properties, AUTO_RECOVERY); + if (autoRecoveryConfigured == null) { + autoRecovery = AUTO_RECOVERY_DEFAULT; + log.info("Auto recovery flag is NOT " + + "configured, default value is {}", autoRecovery); + } else { + autoRecovery = autoRecoveryConfigured; + log.info("Configured. Auto recovery flag is {}", autoRecovery); + } + } + + /** + * Creates a bridge with a given name on a given kubernetes node. + * + * @param k8sNode kubernetes node + * @param bridgeName bridge name + * @param devId device identifier + */ + private void createBridge(K8sNode k8sNode, String bridgeName, DeviceId devId) { + Device device = deviceService.getDevice(k8sNode.ovsdb()); + + List controllers = clusterService.getNodes().stream() + .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO)) + .collect(Collectors.toList()); + + String dpid = devId.toString().substring(DPID_BEGIN); + + BridgeDescription.Builder builder = DefaultBridgeDescription.builder() + .name(bridgeName) + .failMode(BridgeDescription.FailMode.SECURE) + .datapathId(dpid) + .disableInBand() + .controllers(controllers); + + BridgeConfig bridgeConfig = device.as(BridgeConfig.class); + bridgeConfig.addBridge(builder.build()); + } + + /** + * Creates a VXLAN tunnel interface in a given kubernetes node. + * + * @param k8sNode kubernetes node + */ + private void createVxlanTunnelInterface(K8sNode k8sNode) { + createTunnelInterface(k8sNode, VXLAN, VXLAN_TUNNEL); + } + + /** + * Creates a GRE tunnel interface in a given kubernetes node. + * + * @param k8sNode kubernetes node + */ + private void createGreTunnelInterface(K8sNode k8sNode) { + createTunnelInterface(k8sNode, GRE, GRE_TUNNEL); + } + + /** + * Creates a GENEVE tunnel interface in a given kubernetes node. + * + * @param k8sNode kubernetes node + */ + private void createGeneveTunnelInterface(K8sNode k8sNode) { + createTunnelInterface(k8sNode, GENEVE, GENEVE_TUNNEL); + } + + /** + * Creates a tunnel interface in a given kubernetes node. + * + * @param k8sNode kubernetes node + */ + private void createTunnelInterface(K8sNode k8sNode, + String type, String intfName) { + if (isIntfEnabled(k8sNode, intfName)) { + return; + } + + Device device = deviceService.getDevice(k8sNode.ovsdb()); + if (device == null || !device.is(InterfaceConfig.class)) { + log.error("Failed to create tunnel interface on {}", k8sNode.ovsdb()); + return; + } + + TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName); + + InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class); + ifaceConfig.addTunnelMode(intfName, tunnelDesc); + } + + /** + * Builds tunnel description according to the network type. + * + * @param type network type + * @return tunnel description + */ + private TunnelDescription buildTunnelDesc(String type, String intfName) { + if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) { + TunnelDescription.Builder tdBuilder = + DefaultTunnelDescription.builder() + .deviceId(INTEGRATION_BRIDGE) + .ifaceName(intfName) + .remote(TunnelEndPoints.flowTunnelEndpoint()) + .key(TunnelKeys.flowTunnelKey()); + + switch (type) { + case VXLAN: + tdBuilder.type(TunnelDescription.Type.VXLAN); + break; + case GRE: + tdBuilder.type(TunnelDescription.Type.GRE); + break; + case GENEVE: + tdBuilder.type(TunnelDescription.Type.GENEVE); + break; + default: + return null; + } + + return tdBuilder.build(); + } + return null; + } + + /** + * Checks whether a given network interface in a given kubernetes node + * is enabled or not. + * + * @param k8sNode kubernetes node + * @param intf network interface name + * @return true if the given interface is enabled, false otherwise + */ + private boolean isIntfEnabled(K8sNode k8sNode, String intf) { + return deviceService.isAvailable(k8sNode.intgBridge()) && + deviceService.getPorts(k8sNode.intgBridge()).stream() + .anyMatch(port -> Objects.equals( + port.annotations().value(PORT_NAME), intf) && + port.isEnabled()); + } + + /** + * Checks whether all requirements for this state are fulfilled or not. + * + * @param k8sNode kubernetes node + * @return true if all requirements are fulfilled, false otherwise + */ + private boolean isCurrentStateDone(K8sNode k8sNode) { + switch (k8sNode.state()) { + case INIT: + if (!isOvsdbConnected(k8sNode, ovsdbPortNum, + ovsdbController, deviceService)) { + return false; + } + + return deviceService.isAvailable(k8sNode.intgBridge()); + case DEVICE_CREATED: + if (k8sNode.dataIp() != null && + !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) { + return false; + } + if (k8sNode.dataIp() != null && + !isIntfEnabled(k8sNode, GRE_TUNNEL)) { + return false; + } + if (k8sNode.dataIp() != null && + !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) { + return false; + } + + return true; + case COMPLETE: + case INCOMPLETE: + // always return false + // run init CLI to re-trigger node bootstrap + return false; + default: + return true; + } + } + + /** + * Configures the kubernetes node with new state. + * + * @param k8sNode kubernetes node + * @param newState a new state + */ + private void setState(K8sNode k8sNode, K8sNodeState newState) { + if (k8sNode.state() == newState) { + return; + } + K8sNode updated = k8sNode.updateState(newState); + k8sNodeAdminService.updateNode(updated); + log.info("Changed {} state: {}", k8sNode.hostname(), newState); + } + + /** + * Bootstraps a new kubernetes node. + * + * @param k8sNode kubernetes node + */ + private void bootstrapNode(K8sNode k8sNode) { + if (isCurrentStateDone(k8sNode)) { + setState(k8sNode, k8sNode.state().nextState()); + } else { + log.trace("Processing {} state for {}", k8sNode.state(), + k8sNode.hostname()); + k8sNode.state().process(this, k8sNode); + } + } + + private void processK8sNodeRemoved(K8sNode k8sNode) { + OvsdbClientService client = getOvsdbClient(k8sNode, ovsdbPortNum, ovsdbController); + if (client == null) { + log.info("Failed to get ovsdb client"); + return; + } + + // delete integration bridge from the node + client.dropBridge(INTEGRATION_BRIDGE); + + // disconnect ovsdb + client.disconnect(); + } + + /** + * An internal OVSDB listener. This listener is used for listening the + * network facing events from OVSDB device. If a new OVSDB device is detected, + * ONOS tries to bootstrap the kubernetes node. + */ + private class InternalOvsdbListener implements DeviceListener { + + @Override + public boolean isRelevant(DeviceEvent event) { + return event.subject().type() == Device.Type.CONTROLLER; + } + + private boolean isRelevantHelper() { + return Objects.equals(localNode, leadershipService.getLeader(appId.name())); + } + + @Override + public void event(DeviceEvent event) { + Device device = event.subject(); + + switch (event.type()) { + case DEVICE_AVAILABILITY_CHANGED: + case DEVICE_ADDED: + eventExecutor.execute(() -> { + + if (!isRelevantHelper()) { + return; + } + + K8sNode k8sNode = k8sNodeService.node(device.id()); + + if (k8sNode == null) { + return; + } + + if (deviceService.isAvailable(device.id())) { + log.debug("OVSDB {} detected", device.id()); + bootstrapNode(k8sNode); + } + }); + break; + case PORT_ADDED: + case PORT_REMOVED: + case DEVICE_REMOVED: + default: + // do nothing + break; + } + } + } + + /** + * An internal integration bridge listener. This listener is used for + * listening the events from integration bridge. To listen the events from + * other types of bridge such as provider bridge or tunnel bridge, we need + * to augment K8sNodeService.node() method. + */ + private class InternalBridgeListener implements DeviceListener { + + @Override + public boolean isRelevant(DeviceEvent event) { + return event.subject().type() == Device.Type.SWITCH; + } + + private boolean isRelevantHelper() { + return Objects.equals(localNode, leadershipService.getLeader(appId.name())); + } + + @Override + public void event(DeviceEvent event) { + Device device = event.subject(); + + switch (event.type()) { + case DEVICE_AVAILABILITY_CHANGED: + case DEVICE_ADDED: + eventExecutor.execute(() -> { + + if (!isRelevantHelper()) { + return; + } + + K8sNode k8sNode = k8sNodeService.node(device.id()); + + if (k8sNode == null) { + return; + } + + if (deviceService.isAvailable(device.id())) { + log.debug("Integration bridge created on {}", k8sNode.hostname()); + bootstrapNode(k8sNode); + } else if (k8sNode.state() == COMPLETE) { + log.info("Device {} disconnected", device.id()); + setState(k8sNode, INCOMPLETE); + } + + if (autoRecovery) { + if (k8sNode.state() == INCOMPLETE || + k8sNode.state() == DEVICE_CREATED) { + log.info("Device {} is reconnected", device.id()); + k8sNodeAdminService.updateNode( + k8sNode.updateState(K8sNodeState.INIT)); + } + } + }); + break; + case PORT_UPDATED: + case PORT_ADDED: + eventExecutor.execute(() -> { + + if (!isRelevantHelper()) { + return; + } + + K8sNode k8sNode = k8sNodeService.node(device.id()); + + if (k8sNode == null) { + return; + } + + Port port = event.port(); + String portName = port.annotations().value(PORT_NAME); + if (k8sNode.state() == DEVICE_CREATED && ( + Objects.equals(portName, VXLAN_TUNNEL) || + Objects.equals(portName, GRE_TUNNEL) || + Objects.equals(portName, GENEVE_TUNNEL))) { + log.info("Interface {} added or updated to {}", + portName, device.id()); + bootstrapNode(k8sNode); + } + }); + break; + case PORT_REMOVED: + eventExecutor.execute(() -> { + + if (!isRelevantHelper()) { + return; + } + + K8sNode k8sNode = k8sNodeService.node(device.id()); + + if (k8sNode == null) { + return; + } + + Port port = event.port(); + String portName = port.annotations().value(PORT_NAME); + if (k8sNode.state() == COMPLETE && ( + Objects.equals(portName, VXLAN_TUNNEL) || + Objects.equals(portName, GRE_TUNNEL) || + Objects.equals(portName, GENEVE_TUNNEL))) { + log.warn("Interface {} removed from {}", + portName, event.subject().id()); + setState(k8sNode, INCOMPLETE); + } + }); + break; + case DEVICE_REMOVED: + default: + // do nothing + break; + } + } + } + + /** + * An internal kubernetes node listener. + * The notification is triggered by KubernetesNodeStore. + */ + private class InternalK8sNodeListener implements K8sNodeListener { + + private boolean isRelevantHelper() { + return Objects.equals(localNode, leadershipService.getLeader(appId.name())); + } + + @Override + public void event(K8sNodeEvent event) { + switch (event.type()) { + case K8S_NODE_CREATED: + case K8S_NODE_UPDATED: + eventExecutor.execute(() -> { + + if (!isRelevantHelper()) { + return; + } + + bootstrapNode(event.subject()); + }); + break; + case K8S_NODE_REMOVED: + eventExecutor.execute(() -> { + + if (!isRelevantHelper()) { + return; + } + + processK8sNodeRemoved(event.subject()); + }); + break; + case K8S_NODE_INCOMPLETE: + default: + break; + } + } + } +} diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/util/K8sNodeUtil.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/util/K8sNodeUtil.java new file mode 100644 index 0000000000..c0a158ed26 --- /dev/null +++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/util/K8sNodeUtil.java @@ -0,0 +1,149 @@ +/* + * Copyright 2019-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.k8snode.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import org.onosproject.k8snode.api.K8sNode; +import org.onosproject.net.Device; +import org.onosproject.net.behaviour.BridgeConfig; +import org.onosproject.net.behaviour.BridgeName; +import org.onosproject.net.device.DeviceService; +import org.onosproject.ovsdb.controller.OvsdbClientService; +import org.onosproject.ovsdb.controller.OvsdbController; +import org.onosproject.ovsdb.controller.OvsdbNodeId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Dictionary; + +import static org.onlab.util.Tools.get; + +/** + * An utility that used in kubernetes node app. + */ +public final class K8sNodeUtil { + private static final Logger log = LoggerFactory.getLogger(K8sNodeUtil.class); + + /** + * Prevents object installation from external. + */ + private K8sNodeUtil() { + } + + /** + * Checks whether the controller has a connection with an OVSDB that resides + * inside the given kubernetes node. + * + * @param node kubernetes node + * @param ovsdbPort OVSDB port + * @param ovsdbController OVSDB controller + * @param deviceService device service + * @return true if the controller is connected to the OVSDB, false otherwise + */ + public static boolean isOvsdbConnected(K8sNode node, + int ovsdbPort, + OvsdbController ovsdbController, + DeviceService deviceService) { + OvsdbClientService client = getOvsdbClient(node, ovsdbPort, ovsdbController); + return deviceService.isAvailable(node.ovsdb()) && + client != null && + client.isConnected(); + } + + /** + * Gets the ovsdb client with supplied openstack node. + * + * @param node kubernetes node + * @param ovsdbPort ovsdb port + * @param ovsdbController ovsdb controller + * @return ovsdb client + */ + public static OvsdbClientService getOvsdbClient(K8sNode node, + int ovsdbPort, + OvsdbController ovsdbController) { + OvsdbNodeId ovsdb = new OvsdbNodeId(node.managementIp(), ovsdbPort); + return ovsdbController.getOvsdbClient(ovsdb); + } + + /** + * Adds or removes a network interface (aka port) into a given bridge of kubernetes node. + * + * @param k8sNode kubernetes node + * @param bridgeName bridge name + * @param intfName interface name + * @param deviceService device service + * @param addOrRemove add port is true, remove it otherwise + */ + public static synchronized void addOrRemoveSystemInterface(K8sNode k8sNode, + String bridgeName, + String intfName, + DeviceService deviceService, + boolean addOrRemove) { + + + Device device = deviceService.getDevice(k8sNode.ovsdb()); + if (device == null || !device.is(BridgeConfig.class)) { + log.info("device is null or this device if not ovsdb device"); + return; + } + BridgeConfig bridgeConfig = device.as(BridgeConfig.class); + + if (addOrRemove) { + bridgeConfig.addPort(BridgeName.bridgeName(bridgeName), intfName); + } else { + bridgeConfig.deletePort(BridgeName.bridgeName(bridgeName), intfName); + } + } + + /** + * Gets Boolean property from the propertyName + * Return null if propertyName is not found. + * + * @param properties properties to be looked up + * @param propertyName the name of the property to look up + * @return value when the propertyName is defined or return null + */ + public static Boolean getBooleanProperty(Dictionary properties, + String propertyName) { + Boolean value; + try { + String s = get(properties, propertyName); + value = Strings.isNullOrEmpty(s) ? null : Boolean.valueOf(s); + } catch (ClassCastException e) { + value = null; + } + return value; + } + + /** + * Prints out the JSON string in pretty format. + * + * @param mapper Object mapper + * @param jsonString JSON string + * @return pretty formatted JSON string + */ + public static String prettyJson(ObjectMapper mapper, String jsonString) { + try { + Object jsonObject = mapper.readValue(jsonString, Object.class); + return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonObject); + } catch (IOException e) { + log.debug("Json string parsing exception caused by {}", e); + } + return null; + } +} diff --git a/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json b/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json index 34823d2f4d..435161697a 100644 --- a/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json +++ b/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json @@ -13,7 +13,7 @@ "type", "managementIp", "dataIp", - "integrationBridge", + "integrationBridge" ], "properties": { "hostname": { diff --git a/apps/k8s-node/network-cfg.json b/apps/k8s-node/network-cfg.json new file mode 100644 index 0000000000..5c798d1713 --- /dev/null +++ b/apps/k8s-node/network-cfg.json @@ -0,0 +1,19 @@ +{ + "nodes": [ + { + "hostname": "minion-01", + "type": "MINION", + "managementIp": "172.16.130.4", + "dataIp": "172.16.130.4", + "integrationBridge": "of:00000000000000a1" + }, + { + "hostname": "minion-02", + "type": "MINION", + "managementIp": "172.16.130.6", + "dataIp": "172.16.130.6", + "integrationBridge": "of:00000000000000a2" + } + ] +} +