[ONOS-7902] Add node handler and a set of CLIs for kubernetes node

Change-Id: Iee4a88e4af437d551a38342de339455387389f61
This commit is contained in:
Jian Li 2019-01-22 22:55:31 +09:00
parent 49109b5cd4
commit f16e8850b3
11 changed files with 1168 additions and 3 deletions

View File

@ -23,7 +23,7 @@ public final class Constants {
private Constants() {
}
public static final String INTEGRATION_BRIDGE = "br-int";
public static final String INTEGRATION_BRIDGE = "kbr-int";
public static final String VXLAN_TUNNEL = "vxlan";
public static final String GRE_TUNNEL = "gre";
public static final String GENEVE_TUNNEL = "geneve";

View File

@ -47,7 +47,7 @@ public interface K8sNodeHandler {
/**
* Processes the given node for incomplete state.
*
* @param k8sNode kubernete node
* @param k8sNode kubernetes node
*/
void processIncompleteState(K8sNode k8sNode);
}

View File

@ -1,5 +1,7 @@
COMPILE_DEPS = CORE_DEPS + JACKSON + KRYO + CLI + REST + [
"//core/store/serializers:onos-core-serializers",
"//protocols/ovsdb/api:onos-protocols-ovsdb-api",
"//protocols/ovsdb/rfc:onos-protocols-ovsdb-rfc",
"//apps/k8s-node/api:onos-apps-k8s-node-api",
]
@ -10,7 +12,12 @@ TEST_DEPS = TEST_ADAPTERS + TEST_REST + [
]
osgi_jar_with_tests(
api_description = "REST API for Kubernetes Node",
api_package = "org.onosproject.k8snode.web",
api_title = "Kubernetes Node API",
api_version = "1.0",
karaf_command_packages = ["org.onosproject.k8snode.cli"],
test_deps = TEST_DEPS,
web_context = "/onos/k8snode",
deps = COMPILE_DEPS,
)

View File

@ -0,0 +1,52 @@
/*
* Copyright 2019-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.k8snode.cli;
import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.apache.karaf.shell.api.console.CommandLine;
import org.apache.karaf.shell.api.console.Completer;
import org.apache.karaf.shell.api.console.Session;
import org.apache.karaf.shell.support.completers.StringsCompleter;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.k8snode.api.K8sNodeService;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import static org.onosproject.cli.AbstractShellCommand.get;
/**
* Kubernetes host completer.
*/
@Service
public class K8sHostnameCompleter implements Completer {
@Override
public int complete(Session session, CommandLine commandLine, List<String> candidates) {
StringsCompleter delegate = new StringsCompleter();
K8sNodeService nodeService = get(K8sNodeService.class);
Set<String> hostnames = nodeService.nodes().stream()
.map(K8sNode::hostname)
.collect(Collectors.toSet());
SortedSet<String> strings = delegate.getStrings();
strings.addAll(hostnames);
return delegate.complete(session, commandLine, candidates);
}
}

View File

@ -0,0 +1,103 @@
/*
* Copyright 2019-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.k8snode.cli;
import org.apache.karaf.shell.api.action.Argument;
import org.apache.karaf.shell.api.action.Command;
import org.apache.karaf.shell.api.action.Completion;
import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.k8snode.api.K8sNodeService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.device.DeviceService;
import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
import static org.onosproject.net.AnnotationKeys.PORT_NAME;
/**
* Checks detailed node init state.
*/
@Service
@Command(scope = "onos", name = "k8s-node-check",
description = "Shows detailed kubernetes node init state")
public class K8sNodeCheckCommand extends AbstractShellCommand {
@Argument(index = 0, name = "hostname", description = "Hostname",
required = true, multiValued = false)
@Completion(K8sHostnameCompleter.class)
private String hostname = null;
private static final String MSG_OK = "OK";
private static final String MSG_ERROR = "ERROR";
@Override
protected void doExecute() {
K8sNodeService nodeService = get(K8sNodeService.class);
DeviceService deviceService = get(DeviceService.class);
K8sNode node = nodeService.node(hostname);
if (node == null) {
print("Cannot find %s from registered nodes", hostname);
return;
}
print("[Integration Bridge Status]");
Device device = deviceService.getDevice(node.intgBridge());
if (device != null) {
print("%s %s=%s available=%s %s",
deviceService.isAvailable(device.id()) ? MSG_OK : MSG_ERROR,
INTEGRATION_BRIDGE,
device.id(),
deviceService.isAvailable(device.id()),
device.annotations());
if (node.dataIp() != null) {
printPortState(deviceService, node.intgBridge(), VXLAN_TUNNEL);
printPortState(deviceService, node.intgBridge(), GRE_TUNNEL);
printPortState(deviceService, node.intgBridge(), GENEVE_TUNNEL);
}
} else {
print("%s %s=%s is not available",
MSG_ERROR,
INTEGRATION_BRIDGE,
node.intgBridge());
}
}
private void printPortState(DeviceService deviceService,
DeviceId deviceId, String portName) {
Port port = deviceService.getPorts(deviceId).stream()
.filter(p -> p.annotations().value(PORT_NAME).equals(portName) &&
p.isEnabled())
.findAny().orElse(null);
if (port != null) {
print("%s %s portNum=%s enabled=%s %s",
port.isEnabled() ? MSG_OK : MSG_ERROR,
portName,
port.number(),
port.isEnabled() ? Boolean.TRUE : Boolean.FALSE,
port.annotations());
} else {
print("%s %s does not exist", MSG_ERROR, portName);
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright 2019-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.k8snode.cli;
import org.apache.karaf.shell.api.action.Argument;
import org.apache.karaf.shell.api.action.Command;
import org.apache.karaf.shell.api.action.Completion;
import org.apache.karaf.shell.api.action.Option;
import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.k8snode.api.K8sNodeAdminService;
import org.onosproject.k8snode.api.K8sNodeService;
import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
import static org.onosproject.k8snode.api.K8sNodeState.INIT;
/**
* Initializes nodes for node service.
*/
@Service
@Command(scope = "onos", name = "k8s-node-init",
description = "Initializes nodes for kubernetes node service")
public class K8sNodeInitCommand extends AbstractShellCommand {
@Option(name = "-a", aliases = "--all", description = "Apply this command to all nodes",
required = false, multiValued = false)
private boolean isAll = false;
@Option(name = "-i", aliases = "--incomplete",
description = "Apply this command to incomplete nodes",
required = false, multiValued = false)
private boolean isIncomplete = false;
@Argument(index = 0, name = "hostnames", description = "Hostname(s) to apply this command",
required = false, multiValued = true)
@Completion(K8sHostnameCompleter.class)
private String[] hostnames = null;
@Override
protected void doExecute() {
K8sNodeService nodeService = get(K8sNodeService.class);
K8sNodeAdminService nodeAdminService = get(K8sNodeAdminService.class);
if ((!isAll && !isIncomplete && hostnames == null) ||
(isAll && isIncomplete) ||
(isIncomplete && hostnames != null) ||
(hostnames != null && isAll)) {
print("Please specify one of hostname, --all, and --incomplete options.");
return;
}
if (isAll) {
hostnames = nodeService.nodes().stream()
.map(K8sNode::hostname).toArray(String[]::new);
} else if (isIncomplete) {
hostnames = nodeService.nodes().stream()
.filter(node -> node.state() != COMPLETE)
.map(K8sNode::hostname).toArray(String[]::new);
}
for (String hostname : hostnames) {
K8sNode node = nodeService.node(hostname);
if (node == null) {
print("Unable to find %s", hostname);
continue;
}
print("Initializing %s", hostname);
K8sNode updated = node.updateState(INIT);
nodeAdminService.updateNode(updated);
}
print("Done.");
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2019-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.k8snode.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.Lists;
import org.apache.karaf.shell.api.action.Command;
import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.k8snode.api.K8sNodeService;
import java.util.Comparator;
import java.util.List;
import static org.onosproject.k8snode.util.K8sNodeUtil.prettyJson;
/**
* Lists all nodes registered to the service.
*/
@Service
@Command(scope = "onos", name = "k8s-nodes",
description = "Lists all nodes registered in kubernetes node service")
public class K8sNodeListCommand extends AbstractShellCommand {
private static final String FORMAT = "%-20s%-15s%-24s%-24s%-20s%-15s";
@Override
protected void doExecute() {
K8sNodeService nodeService = get(K8sNodeService.class);
List<K8sNode> nodes = Lists.newArrayList(nodeService.nodes());
nodes.sort(Comparator.comparing(K8sNode::hostname));
if (outputJson()) {
print("%s", json(nodes));
} else {
print(FORMAT, "Hostname", "Type", "Integration Bridge",
"Management IP", "Data IP", "State");
for (K8sNode node : nodes) {
print(FORMAT,
node.hostname(),
node.type(),
node.intgBridge(),
node.managementIp(),
node.dataIp() != null ? node.dataIp() : "",
node.state());
}
print("Total %s nodes", nodeService.nodes().size());
}
}
private String json(List<K8sNode> nodes) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode result = mapper.createArrayNode();
for (K8sNode node : nodes) {
result.add(jsonForEntity(node, K8sNode.class));
}
return prettyJson(mapper, result.toString());
}
}

View File

@ -0,0 +1,674 @@
/*
* Copyright 2019-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.k8snode.impl;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.k8snode.api.K8sNodeAdminService;
import org.onosproject.k8snode.api.K8sNodeEvent;
import org.onosproject.k8snode.api.K8sNodeHandler;
import org.onosproject.k8snode.api.K8sNodeListener;
import org.onosproject.k8snode.api.K8sNodeService;
import org.onosproject.k8snode.api.K8sNodeState;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.behaviour.BridgeConfig;
import org.onosproject.net.behaviour.BridgeDescription;
import org.onosproject.net.behaviour.ControllerInfo;
import org.onosproject.net.behaviour.DefaultBridgeDescription;
import org.onosproject.net.behaviour.DefaultTunnelDescription;
import org.onosproject.net.behaviour.InterfaceConfig;
import org.onosproject.net.behaviour.TunnelDescription;
import org.onosproject.net.behaviour.TunnelEndPoints;
import org.onosproject.net.behaviour.TunnelKeys;
import org.onosproject.net.device.DeviceAdminService;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.ovsdb.controller.OvsdbClientService;
import org.onosproject.ovsdb.controller.OvsdbController;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import java.util.Dictionary;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.packet.TpPort.tpPort;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.k8snode.api.Constants.GENEVE;
import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
import static org.onosproject.k8snode.api.Constants.GRE;
import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
import static org.onosproject.k8snode.api.Constants.VXLAN;
import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
import static org.onosproject.k8snode.api.K8sNodeState.DEVICE_CREATED;
import static org.onosproject.k8snode.api.K8sNodeState.INCOMPLETE;
import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
import static org.onosproject.k8snode.util.K8sNodeUtil.getBooleanProperty;
import static org.onosproject.k8snode.util.K8sNodeUtil.getOvsdbClient;
import static org.onosproject.k8snode.util.K8sNodeUtil.isOvsdbConnected;
import static org.onosproject.net.AnnotationKeys.PORT_NAME;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Service bootstraps kubernetes node based on its type.
*/
@Component(immediate = true,
property = {
OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
}
)
public class DefaultK8sNodeHandler implements K8sNodeHandler {
private final Logger log = getLogger(getClass());
private static final String DEFAULT_OF_PROTO = "tcp";
private static final int DEFAULT_OFPORT = 6653;
private static final int DPID_BEGIN = 3;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceAdminService deviceAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OvsdbController ovsdbController;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sNodeService k8sNodeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sNodeAdminService k8sNodeAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService componentConfigService;
/** OVSDB server listen port. */
private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
/** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final DeviceListener ovsdbListener = new InternalOvsdbListener();
private final DeviceListener bridgeListener = new InternalBridgeListener();
private final K8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
private ApplicationId appId;
private NodeId localNode;
@Activate
protected void activate() {
appId = coreService.getAppId(APP_ID);
localNode = clusterService.getLocalNode().id();
componentConfigService.registerProperties(getClass());
leadershipService.runForLeadership(appId.name());
deviceService.addListener(ovsdbListener);
deviceService.addListener(bridgeListener);
k8sNodeService.addListener(k8sNodeListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
k8sNodeService.removeListener(k8sNodeListener);
deviceService.removeListener(bridgeListener);
deviceService.removeListener(ovsdbListener);
componentConfigService.unregisterProperties(getClass(), false);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
log.info("Stopped");
}
@Modified
protected void modified(ComponentContext context) {
readComponentConfiguration(context);
log.info("Modified");
}
@Override
public void processInitState(K8sNode k8sNode) {
if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
return;
}
if (!deviceService.isAvailable(k8sNode.intgBridge())) {
createBridge(k8sNode, INTEGRATION_BRIDGE, k8sNode.intgBridge());
}
}
@Override
public void processDeviceCreatedState(K8sNode k8sNode) {
try {
if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
return;
}
if (k8sNode.dataIp() != null &&
!isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
createVxlanTunnelInterface(k8sNode);
}
if (k8sNode.dataIp() != null &&
!isIntfEnabled(k8sNode, GRE_TUNNEL)) {
createGreTunnelInterface(k8sNode);
}
if (k8sNode.dataIp() != null &&
!isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
createGeneveTunnelInterface(k8sNode);
}
} catch (Exception e) {
log.error("Exception occurred because of {}", e);
}
}
@Override
public void processCompleteState(K8sNode k8sNode) {
// do something if needed
}
@Override
public void processIncompleteState(K8sNode k8sNode) {
// do something if needed
}
/**
* Extracts properties from the component configuration context.
*
* @param context the component context
*/
private void readComponentConfiguration(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties();
Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
if (ovsdbPortConfigured == null) {
ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
} else {
ovsdbPortNum = ovsdbPortConfigured;
log.info("Configured. OVSDB port is {}", ovsdbPortNum);
}
Boolean autoRecoveryConfigured =
getBooleanProperty(properties, AUTO_RECOVERY);
if (autoRecoveryConfigured == null) {
autoRecovery = AUTO_RECOVERY_DEFAULT;
log.info("Auto recovery flag is NOT " +
"configured, default value is {}", autoRecovery);
} else {
autoRecovery = autoRecoveryConfigured;
log.info("Configured. Auto recovery flag is {}", autoRecovery);
}
}
/**
* Creates a bridge with a given name on a given kubernetes node.
*
* @param k8sNode kubernetes node
* @param bridgeName bridge name
* @param devId device identifier
*/
private void createBridge(K8sNode k8sNode, String bridgeName, DeviceId devId) {
Device device = deviceService.getDevice(k8sNode.ovsdb());
List<ControllerInfo> controllers = clusterService.getNodes().stream()
.map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
.collect(Collectors.toList());
String dpid = devId.toString().substring(DPID_BEGIN);
BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
.name(bridgeName)
.failMode(BridgeDescription.FailMode.SECURE)
.datapathId(dpid)
.disableInBand()
.controllers(controllers);
BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
bridgeConfig.addBridge(builder.build());
}
/**
* Creates a VXLAN tunnel interface in a given kubernetes node.
*
* @param k8sNode kubernetes node
*/
private void createVxlanTunnelInterface(K8sNode k8sNode) {
createTunnelInterface(k8sNode, VXLAN, VXLAN_TUNNEL);
}
/**
* Creates a GRE tunnel interface in a given kubernetes node.
*
* @param k8sNode kubernetes node
*/
private void createGreTunnelInterface(K8sNode k8sNode) {
createTunnelInterface(k8sNode, GRE, GRE_TUNNEL);
}
/**
* Creates a GENEVE tunnel interface in a given kubernetes node.
*
* @param k8sNode kubernetes node
*/
private void createGeneveTunnelInterface(K8sNode k8sNode) {
createTunnelInterface(k8sNode, GENEVE, GENEVE_TUNNEL);
}
/**
* Creates a tunnel interface in a given kubernetes node.
*
* @param k8sNode kubernetes node
*/
private void createTunnelInterface(K8sNode k8sNode,
String type, String intfName) {
if (isIntfEnabled(k8sNode, intfName)) {
return;
}
Device device = deviceService.getDevice(k8sNode.ovsdb());
if (device == null || !device.is(InterfaceConfig.class)) {
log.error("Failed to create tunnel interface on {}", k8sNode.ovsdb());
return;
}
TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
ifaceConfig.addTunnelMode(intfName, tunnelDesc);
}
/**
* Builds tunnel description according to the network type.
*
* @param type network type
* @return tunnel description
*/
private TunnelDescription buildTunnelDesc(String type, String intfName) {
if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
TunnelDescription.Builder tdBuilder =
DefaultTunnelDescription.builder()
.deviceId(INTEGRATION_BRIDGE)
.ifaceName(intfName)
.remote(TunnelEndPoints.flowTunnelEndpoint())
.key(TunnelKeys.flowTunnelKey());
switch (type) {
case VXLAN:
tdBuilder.type(TunnelDescription.Type.VXLAN);
break;
case GRE:
tdBuilder.type(TunnelDescription.Type.GRE);
break;
case GENEVE:
tdBuilder.type(TunnelDescription.Type.GENEVE);
break;
default:
return null;
}
return tdBuilder.build();
}
return null;
}
/**
* Checks whether a given network interface in a given kubernetes node
* is enabled or not.
*
* @param k8sNode kubernetes node
* @param intf network interface name
* @return true if the given interface is enabled, false otherwise
*/
private boolean isIntfEnabled(K8sNode k8sNode, String intf) {
return deviceService.isAvailable(k8sNode.intgBridge()) &&
deviceService.getPorts(k8sNode.intgBridge()).stream()
.anyMatch(port -> Objects.equals(
port.annotations().value(PORT_NAME), intf) &&
port.isEnabled());
}
/**
* Checks whether all requirements for this state are fulfilled or not.
*
* @param k8sNode kubernetes node
* @return true if all requirements are fulfilled, false otherwise
*/
private boolean isCurrentStateDone(K8sNode k8sNode) {
switch (k8sNode.state()) {
case INIT:
if (!isOvsdbConnected(k8sNode, ovsdbPortNum,
ovsdbController, deviceService)) {
return false;
}
return deviceService.isAvailable(k8sNode.intgBridge());
case DEVICE_CREATED:
if (k8sNode.dataIp() != null &&
!isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
return false;
}
if (k8sNode.dataIp() != null &&
!isIntfEnabled(k8sNode, GRE_TUNNEL)) {
return false;
}
if (k8sNode.dataIp() != null &&
!isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
return false;
}
return true;
case COMPLETE:
case INCOMPLETE:
// always return false
// run init CLI to re-trigger node bootstrap
return false;
default:
return true;
}
}
/**
* Configures the kubernetes node with new state.
*
* @param k8sNode kubernetes node
* @param newState a new state
*/
private void setState(K8sNode k8sNode, K8sNodeState newState) {
if (k8sNode.state() == newState) {
return;
}
K8sNode updated = k8sNode.updateState(newState);
k8sNodeAdminService.updateNode(updated);
log.info("Changed {} state: {}", k8sNode.hostname(), newState);
}
/**
* Bootstraps a new kubernetes node.
*
* @param k8sNode kubernetes node
*/
private void bootstrapNode(K8sNode k8sNode) {
if (isCurrentStateDone(k8sNode)) {
setState(k8sNode, k8sNode.state().nextState());
} else {
log.trace("Processing {} state for {}", k8sNode.state(),
k8sNode.hostname());
k8sNode.state().process(this, k8sNode);
}
}
private void processK8sNodeRemoved(K8sNode k8sNode) {
OvsdbClientService client = getOvsdbClient(k8sNode, ovsdbPortNum, ovsdbController);
if (client == null) {
log.info("Failed to get ovsdb client");
return;
}
// delete integration bridge from the node
client.dropBridge(INTEGRATION_BRIDGE);
// disconnect ovsdb
client.disconnect();
}
/**
* An internal OVSDB listener. This listener is used for listening the
* network facing events from OVSDB device. If a new OVSDB device is detected,
* ONOS tries to bootstrap the kubernetes node.
*/
private class InternalOvsdbListener implements DeviceListener {
@Override
public boolean isRelevant(DeviceEvent event) {
return event.subject().type() == Device.Type.CONTROLLER;
}
private boolean isRelevantHelper() {
return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
}
@Override
public void event(DeviceEvent event) {
Device device = event.subject();
switch (event.type()) {
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_ADDED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
K8sNode k8sNode = k8sNodeService.node(device.id());
if (k8sNode == null) {
return;
}
if (deviceService.isAvailable(device.id())) {
log.debug("OVSDB {} detected", device.id());
bootstrapNode(k8sNode);
}
});
break;
case PORT_ADDED:
case PORT_REMOVED:
case DEVICE_REMOVED:
default:
// do nothing
break;
}
}
}
/**
* An internal integration bridge listener. This listener is used for
* listening the events from integration bridge. To listen the events from
* other types of bridge such as provider bridge or tunnel bridge, we need
* to augment K8sNodeService.node() method.
*/
private class InternalBridgeListener implements DeviceListener {
@Override
public boolean isRelevant(DeviceEvent event) {
return event.subject().type() == Device.Type.SWITCH;
}
private boolean isRelevantHelper() {
return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
}
@Override
public void event(DeviceEvent event) {
Device device = event.subject();
switch (event.type()) {
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_ADDED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
K8sNode k8sNode = k8sNodeService.node(device.id());
if (k8sNode == null) {
return;
}
if (deviceService.isAvailable(device.id())) {
log.debug("Integration bridge created on {}", k8sNode.hostname());
bootstrapNode(k8sNode);
} else if (k8sNode.state() == COMPLETE) {
log.info("Device {} disconnected", device.id());
setState(k8sNode, INCOMPLETE);
}
if (autoRecovery) {
if (k8sNode.state() == INCOMPLETE ||
k8sNode.state() == DEVICE_CREATED) {
log.info("Device {} is reconnected", device.id());
k8sNodeAdminService.updateNode(
k8sNode.updateState(K8sNodeState.INIT));
}
}
});
break;
case PORT_UPDATED:
case PORT_ADDED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
K8sNode k8sNode = k8sNodeService.node(device.id());
if (k8sNode == null) {
return;
}
Port port = event.port();
String portName = port.annotations().value(PORT_NAME);
if (k8sNode.state() == DEVICE_CREATED && (
Objects.equals(portName, VXLAN_TUNNEL) ||
Objects.equals(portName, GRE_TUNNEL) ||
Objects.equals(portName, GENEVE_TUNNEL))) {
log.info("Interface {} added or updated to {}",
portName, device.id());
bootstrapNode(k8sNode);
}
});
break;
case PORT_REMOVED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
K8sNode k8sNode = k8sNodeService.node(device.id());
if (k8sNode == null) {
return;
}
Port port = event.port();
String portName = port.annotations().value(PORT_NAME);
if (k8sNode.state() == COMPLETE && (
Objects.equals(portName, VXLAN_TUNNEL) ||
Objects.equals(portName, GRE_TUNNEL) ||
Objects.equals(portName, GENEVE_TUNNEL))) {
log.warn("Interface {} removed from {}",
portName, event.subject().id());
setState(k8sNode, INCOMPLETE);
}
});
break;
case DEVICE_REMOVED:
default:
// do nothing
break;
}
}
}
/**
* An internal kubernetes node listener.
* The notification is triggered by KubernetesNodeStore.
*/
private class InternalK8sNodeListener implements K8sNodeListener {
private boolean isRelevantHelper() {
return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
}
@Override
public void event(K8sNodeEvent event) {
switch (event.type()) {
case K8S_NODE_CREATED:
case K8S_NODE_UPDATED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
bootstrapNode(event.subject());
});
break;
case K8S_NODE_REMOVED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
processK8sNodeRemoved(event.subject());
});
break;
case K8S_NODE_INCOMPLETE:
default:
break;
}
}
}
}

View File

@ -0,0 +1,149 @@
/*
* Copyright 2019-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.k8snode.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.net.Device;
import org.onosproject.net.behaviour.BridgeConfig;
import org.onosproject.net.behaviour.BridgeName;
import org.onosproject.net.device.DeviceService;
import org.onosproject.ovsdb.controller.OvsdbClientService;
import org.onosproject.ovsdb.controller.OvsdbController;
import org.onosproject.ovsdb.controller.OvsdbNodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Dictionary;
import static org.onlab.util.Tools.get;
/**
* An utility that used in kubernetes node app.
*/
public final class K8sNodeUtil {
private static final Logger log = LoggerFactory.getLogger(K8sNodeUtil.class);
/**
* Prevents object installation from external.
*/
private K8sNodeUtil() {
}
/**
* Checks whether the controller has a connection with an OVSDB that resides
* inside the given kubernetes node.
*
* @param node kubernetes node
* @param ovsdbPort OVSDB port
* @param ovsdbController OVSDB controller
* @param deviceService device service
* @return true if the controller is connected to the OVSDB, false otherwise
*/
public static boolean isOvsdbConnected(K8sNode node,
int ovsdbPort,
OvsdbController ovsdbController,
DeviceService deviceService) {
OvsdbClientService client = getOvsdbClient(node, ovsdbPort, ovsdbController);
return deviceService.isAvailable(node.ovsdb()) &&
client != null &&
client.isConnected();
}
/**
* Gets the ovsdb client with supplied openstack node.
*
* @param node kubernetes node
* @param ovsdbPort ovsdb port
* @param ovsdbController ovsdb controller
* @return ovsdb client
*/
public static OvsdbClientService getOvsdbClient(K8sNode node,
int ovsdbPort,
OvsdbController ovsdbController) {
OvsdbNodeId ovsdb = new OvsdbNodeId(node.managementIp(), ovsdbPort);
return ovsdbController.getOvsdbClient(ovsdb);
}
/**
* Adds or removes a network interface (aka port) into a given bridge of kubernetes node.
*
* @param k8sNode kubernetes node
* @param bridgeName bridge name
* @param intfName interface name
* @param deviceService device service
* @param addOrRemove add port is true, remove it otherwise
*/
public static synchronized void addOrRemoveSystemInterface(K8sNode k8sNode,
String bridgeName,
String intfName,
DeviceService deviceService,
boolean addOrRemove) {
Device device = deviceService.getDevice(k8sNode.ovsdb());
if (device == null || !device.is(BridgeConfig.class)) {
log.info("device is null or this device if not ovsdb device");
return;
}
BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
if (addOrRemove) {
bridgeConfig.addPort(BridgeName.bridgeName(bridgeName), intfName);
} else {
bridgeConfig.deletePort(BridgeName.bridgeName(bridgeName), intfName);
}
}
/**
* Gets Boolean property from the propertyName
* Return null if propertyName is not found.
*
* @param properties properties to be looked up
* @param propertyName the name of the property to look up
* @return value when the propertyName is defined or return null
*/
public static Boolean getBooleanProperty(Dictionary<?, ?> properties,
String propertyName) {
Boolean value;
try {
String s = get(properties, propertyName);
value = Strings.isNullOrEmpty(s) ? null : Boolean.valueOf(s);
} catch (ClassCastException e) {
value = null;
}
return value;
}
/**
* Prints out the JSON string in pretty format.
*
* @param mapper Object mapper
* @param jsonString JSON string
* @return pretty formatted JSON string
*/
public static String prettyJson(ObjectMapper mapper, String jsonString) {
try {
Object jsonObject = mapper.readValue(jsonString, Object.class);
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonObject);
} catch (IOException e) {
log.debug("Json string parsing exception caused by {}", e);
}
return null;
}
}

View File

@ -13,7 +13,7 @@
"type",
"managementIp",
"dataIp",
"integrationBridge",
"integrationBridge"
],
"properties": {
"hostname": {

View File

@ -0,0 +1,19 @@
{
"nodes": [
{
"hostname": "minion-01",
"type": "MINION",
"managementIp": "172.16.130.4",
"dataIp": "172.16.130.4",
"integrationBridge": "of:00000000000000a1"
},
{
"hostname": "minion-02",
"type": "MINION",
"managementIp": "172.16.130.6",
"dataIp": "172.16.130.6",
"integrationBridge": "of:00000000000000a2"
}
]
}