Purge the tunnel and router bridge on removing kubernetes host

Change-Id: I8743b1064b29c76fe96bc4dbfa267f425430a96b
(cherry picked from commit 32a28ad698cacb5cbdc454f2793c6e46f0ec1658)
This commit is contained in:
Jian Li 2020-12-01 00:35:50 +09:00
parent 8fa74cfaa2
commit 01613c2e20
13 changed files with 94 additions and 211 deletions

View File

@ -332,52 +332,6 @@ public class K8sFlowRuleManager implements K8sFlowRuleService {
applyRule(flowRule, true);
}
private void purgeAnyRoutingRule(K8sNode localNode) {
K8sNetwork k8sNetwork = k8sNetworkService.network(localNode.hostname());
IpPrefix srcIpPrefix = IpPrefix.valueOf(k8sNetwork.gatewayIp(), HOST_PREFIX);
IpPrefix dstIpPrefix = IpPrefix.valueOf(k8sNetwork.cidr());
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPSrc(srcIpPrefix)
.matchIPDst(dstIpPrefix);
for (K8sNode node : k8sNodeService.nodes()) {
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.setTunnelId(Long.valueOf(k8sNetwork.segmentId()));
if (node.hostname().equals(k8sNetwork.name())) {
tBuilder.transition(STAT_EGRESS_TABLE);
} else {
tBuilder.setOutput(node.intgToTunPortNum());
// install flows into tunnel bridge
PortNumber portNum = tunnelPortNumByNetId(k8sNetwork.networkId(),
k8sNetworkService, node);
TrafficTreatment treatmentToRemote = DefaultTrafficTreatment.builder()
.extension(buildExtension(
deviceService,
node.tunBridge(),
localNode.dataIp().getIp4Address()),
node.tunBridge())
.setTunnelId(Long.valueOf(k8sNetwork.segmentId()))
.setOutput(portNum)
.build();
FlowRule remoteFlowRule = DefaultFlowRule.builder()
.forDevice(node.tunBridge())
.withSelector(sBuilder.build())
.withTreatment(treatmentToRemote)
.withPriority(PRIORITY_CIDR_RULE)
.fromApp(appId)
.makePermanent()
.forTable(TUN_ENTRY_TABLE)
.build();
applyRule(remoteFlowRule, false);
}
}
}
private void setAnyRoutingRule(IpPrefix srcIpPrefix, MacAddress mac,
K8sNetwork k8sNetwork) {
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
@ -452,9 +406,6 @@ public class K8sFlowRuleManager implements K8sFlowRuleService {
case K8S_NODE_COMPLETE:
deviceEventExecutor.execute(() -> processNodeCompletion(event.subject()));
break;
case K8S_NODE_OFF_BOARDED:
deviceEventExecutor.execute(() -> processNodeOffboard(event.subject()));
break;
case K8S_NODE_CREATED:
default:
// do nothing
@ -481,16 +432,6 @@ public class K8sFlowRuleManager implements K8sFlowRuleService {
}
}
}
private void processNodeOffboard(K8sNode node) {
log.info("Offboarded node {} is detected", node.hostname());
if (!isRelevantHelper()) {
return;
}
purgeAnyRoutingRule(node);
}
}
private class InternalK8sNetworkListener implements K8sNetworkListener {

View File

@ -400,9 +400,6 @@ public class K8sNodePortHandler {
case K8S_NODE_COMPLETE:
eventExecutor.execute(() -> processNodeCompletion(event.subject()));
break;
case K8S_NODE_OFF_BOARDED:
eventExecutor.execute(() -> processNodeOffboard(event.subject()));
break;
case K8S_NODE_INCOMPLETE:
default:
break;
@ -435,9 +432,5 @@ public class K8sNodePortHandler {
setIntgToExtRules(updatedNode, getServiceCidr(), true);
setTunToIntgRules(updatedNode, true);
}
private void processNodeOffboard(K8sNode k8sNode) {
setTunToIntgRules(k8sNode, false);
}
}
}

View File

@ -983,9 +983,6 @@ public class K8sServiceHandler {
case K8S_NODE_COMPLETE:
eventExecutor.execute(() -> processNodeCompletion(k8sNode));
break;
case K8S_NODE_OFF_BOARDED:
eventExecutor.execute(() -> processNodeOffboard(k8sNode));
break;
case K8S_NODE_INCOMPLETE:
default:
break;
@ -1001,15 +998,6 @@ public class K8sServiceHandler {
k8sEndpointsService.endpointses().forEach(e -> setEndpointsRules(e, true));
k8sNetworkService.networks().forEach(n -> setupServiceDefaultRule(n, true));
}
private void processNodeOffboard(K8sNode node) {
if (!isRelevantHelper()) {
return;
}
K8sNetwork network = k8sNetworkService.network(node.hostname());
setupServiceDefaultRule(network, false);
}
}
private class InternalK8sNetworkListener implements K8sNetworkListener {

View File

@ -227,44 +227,6 @@ public class K8sSwitchingGatewayHandler {
}
}
private void setGatewayTunnelRule(K8sNode node, boolean install) {
K8sNetwork k8sNetwork = k8sNetworkService.network(node.hostname());
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(),
HOST_PREFIX));
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
K8sNode localNode = k8sNodeService.node(k8sNetwork.name());
tBuilder.setOutput(node.intgToTunPortNum());
// install flows into tunnel bridge
PortNumber portNum = tunnelPortNumByNetId(k8sNetwork.networkId(),
k8sNetworkService, node);
TrafficTreatment treatmentToRemote = DefaultTrafficTreatment.builder()
.extension(buildExtension(
deviceService,
node.tunBridge(),
localNode.dataIp().getIp4Address()),
node.tunBridge())
.setTunnelId(Long.valueOf(k8sNetwork.segmentId()))
.setOutput(portNum)
.build();
k8sFlowRuleService.setRule(
appId,
node.tunBridge(),
sBuilder.build(),
treatmentToRemote,
PRIORITY_GATEWAY_RULE,
TUN_ENTRY_TABLE,
install);
}
private void setInterNodeRoutingRules(K8sNode srcNode, boolean install) {
if (srcNode == null) {
return;
@ -491,9 +453,6 @@ public class K8sSwitchingGatewayHandler {
case K8S_NODE_COMPLETE:
eventExecutor.execute(() -> processNodeCompletion(event.subject()));
break;
case K8S_NODE_OFF_BOARDED:
eventExecutor.execute(() -> processNodeOffboard(event.subject()));
break;
case K8S_NODE_INCOMPLETE:
default:
break;
@ -513,14 +472,5 @@ public class K8sSwitchingGatewayHandler {
setInterNodeRoutingRules(node, true);
}
private void processNodeOffboard(K8sNode node) {
if (!isRelevantHelper()) {
return;
}
setGatewayTunnelRule(node, false);
setInterNodeRoutingRules(node, false);
}
}
}

View File

@ -497,8 +497,6 @@ public class K8sSwitchingHandler {
case K8S_NODE_COMPLETE:
eventExecutor.execute(() -> processNodeCompletion(event.subject()));
break;
case K8S_NODE_OFF_BOARDED:
eventExecutor.execute(() -> processNodeOffboard(event.subject()));
default:
break;
}
@ -513,13 +511,5 @@ public class K8sSwitchingHandler {
setLocalTunnelTagFlowRules(k8sNode, true);
setRulesForTunnelBridge(k8sNode, true);
}
private void processNodeOffboard(K8sNode k8sNode) {
if (!isRelevantHelper()) {
return;
}
setRulesForTunnelBridge(k8sNode, false);
}
}
}

View File

@ -16,6 +16,7 @@
package org.onosproject.k8snetworking.web;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.onlab.packet.IpAddress;
import org.onlab.util.ItemNotFoundException;
import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
import org.onosproject.k8snetworking.api.K8sIngressAdminService;
@ -23,10 +24,13 @@ import org.onosproject.k8snetworking.api.K8sNamespaceAdminService;
import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
import org.onosproject.k8snetworking.api.K8sNetworkPolicyAdminService;
import org.onosproject.k8snetworking.api.K8sPodAdminService;
import org.onosproject.k8snetworking.api.K8sPort;
import org.onosproject.k8snetworking.api.K8sServiceAdminService;
import org.onosproject.k8snetworking.util.K8sNetworkingUtil;
import org.onosproject.k8snode.api.K8sApiConfig;
import org.onosproject.k8snode.api.K8sApiConfigService;
import org.onosproject.k8snode.api.K8sHost;
import org.onosproject.k8snode.api.K8sHostAdminService;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.k8snode.api.K8sNodeAdminService;
import org.onosproject.k8snode.api.K8sNodeState;
@ -34,11 +38,15 @@ import org.onosproject.rest.AbstractWebResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Thread.sleep;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.syncPortFromPod;
@ -53,9 +61,13 @@ import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
public class K8sManagementWebResource extends AbstractWebResource {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long MID_SLEEP_MS = 3000; // we wait 3s
private static final long SLEEP_MS = 10000; // we wait 10s
private static final long TIMEOUT_MS = 30000; // we wait 30s
private static final String MESSAGE_ALL = "Received all %s request";
private static final String REMOVE = "REMOVE";
private final K8sApiConfigService configService = get(K8sApiConfigService.class);
private final K8sPodAdminService podAdminService = get(K8sPodAdminService.class);
private final K8sNamespaceAdminService namespaceAdminService =
@ -70,6 +82,8 @@ public class K8sManagementWebResource extends AbstractWebResource {
get(K8sNetworkAdminService.class);
private final K8sNodeAdminService nodeAdminService =
get(K8sNodeAdminService.class);
private final K8sHostAdminService hostAdminService =
get(K8sHostAdminService.class);
private final K8sNetworkPolicyAdminService policyAdminService =
get(K8sNetworkPolicyAdminService.class);
@ -162,6 +176,64 @@ public class K8sManagementWebResource extends AbstractWebResource {
return ok(mapper().createObjectNode()).build();
}
/**
* Removes all nodes and hosts.
*
* @return 204 NO_CONTENT, 400 BAD_REQUEST if the JSON is malformed, and
* 304 NOT_MODIFIED without the updated config
*/
@DELETE
@Path("purge/all")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response purgeAll() {
log.trace(String.format(MESSAGE_ALL, REMOVE));
Set<String> portIds = networkAdminService.ports().stream().map(K8sPort::portId).collect(Collectors.toSet());
portIds.forEach(networkAdminService::removePort);
try {
sleep(MID_SLEEP_MS);
} catch (InterruptedException e) {
log.error("Exception caused during node synchronization...");
}
Set<String> masters = nodeAdminService.nodes(K8sNode.Type.MASTER).stream()
.map(K8sNode::hostname).collect(Collectors.toSet());
Set<String> workers = nodeAdminService.nodes(K8sNode.Type.MINION).stream()
.map(K8sNode::hostname).collect(Collectors.toSet());
for (String hostname : workers) {
nodeAdminService.removeNode(hostname);
try {
sleep(MID_SLEEP_MS);
} catch (InterruptedException e) {
log.error("Exception caused during node synchronization...");
}
}
for (String hostname : masters) {
nodeAdminService.removeNode(hostname);
try {
sleep(MID_SLEEP_MS);
} catch (InterruptedException e) {
log.error("Exception caused during node synchronization...");
}
}
Set<IpAddress> allHosts = hostAdminService.hosts().stream().map(K8sHost::hostIp).collect(Collectors.toSet());
for (IpAddress hostIp : allHosts) {
hostAdminService.removeHost(hostIp);
try {
sleep(MID_SLEEP_MS);
} catch (InterruptedException e) {
log.error("Exception caused during node synchronization...");
}
}
return Response.noContent().build();
}
private void syncRulesBase() {
nodeAdminService.completeNodes(MASTER).forEach(this::syncRulesBaseForNode);
nodeAdminService.completeNodes(MINION).forEach(this::syncRulesBaseForNode);

View File

@ -126,17 +126,4 @@ public class K8sPortWebResource extends AbstractWebResource {
adminService.removePort(id);
return Response.noContent().build();
}
/**
* Removes the port with the given id.
*
* @return 204 NO_CONTENT, 400 BAD_REQUEST if the port does not exist
*/
@DELETE
public Response removeAllPorts() {
adminService.ports().stream()
.map(K8sPort::portId)
.forEach(adminService::removePort);
return Response.noContent().build();
}
}

View File

@ -75,11 +75,4 @@ public interface K8sNodeHandler {
* @param k8sNode kubernetes node
*/
void processPostOnBoardState(K8sNode k8sNode);
/**
* Processes the given node for off boarded state.
*
* @param k8sNode kubernetes node
*/
void processOffBoardedState(K8sNode k8sNode);
}

View File

@ -117,20 +117,6 @@ public enum K8sNodeState {
public K8sNodeState nextState() {
return INIT;
}
},
/**
* Indicates node is removed.
*/
OFF_BOARDED {
@Override
public void process(K8sNodeHandler handler, K8sNode node) {
handler.processOffBoardedState(node);
}
@Override
public K8sNodeState nextState() {
return OFF_BOARDED;
}
};
/**

View File

@ -530,6 +530,20 @@ public class DefaultK8sHostHandler implements K8sHostHandler {
}
}
private void processHostRemoval(K8sHost k8sHost) {
OvsdbClientService client = getOvsdbClient(k8sHost, ovsdbPortNum, ovsdbController);
if (client == null) {
log.info("Failed to get ovsdb client");
return;
}
// delete tunnel bridge from the host
k8sHost.tunBridges().forEach(br -> client.dropBridge(br.name()));
// delete router bridge from the host
k8sHost.routerBridges().forEach(br -> client.dropBridge(br.name()));
}
private class InternalOvsdbListener implements DeviceListener {
@Override
@ -715,6 +729,14 @@ public class DefaultK8sHostHandler implements K8sHostHandler {
});
break;
case K8S_HOST_REMOVED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
processHostRemoval(event.subject());
});
break;
case K8S_HOST_INCOMPLETE:
default:
break;

View File

@ -262,11 +262,6 @@ public class DefaultK8sNodeHandler implements K8sNodeHandler {
// do something if needed
}
@Override
public void processOffBoardedState(K8sNode k8sNode) {
// do something if needed
}
/**
* Extracts properties from the component configuration context.
*

View File

@ -185,11 +185,6 @@ public class DistributedK8sNodeStore
K8S_NODE_INCOMPLETE,
event.newValue().value()
));
} else if (event.newValue().value().state() == OFF_BOARDED) {
notifyDelegate(new K8sNodeEvent(
K8S_NODE_OFF_BOARDED,
event.newValue().value()
));
}
});
break;

View File

@ -52,7 +52,6 @@ import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
import static javax.ws.rs.core.Response.created;
import static org.onlab.util.Tools.nullIsIllegal;
import static org.onlab.util.Tools.readTreeFromStream;
import static org.onosproject.k8snode.api.K8sNodeState.OFF_BOARDED;
import static org.onosproject.k8snode.api.K8sNodeState.POST_ON_BOARD;
import static org.onosproject.k8snode.util.K8sNodeUtil.endpoint;
@ -178,34 +177,6 @@ public class K8sNodeWebResource extends AbstractWebResource {
return Response.noContent().build();
}
/**
* Off-board a kubernetes node.
*
* @param hostname host name contained in kubernetes nodes configuration
* @return 200 OK with the updated kubernetes node's config, 400 BAD_REQUEST
* if the JSON is malformed, and 304 NOT_MODIFIED without the updated config
*/
@PUT
@Path("node/offboard/{hostname}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response offboardNode(@PathParam("hostname") String hostname) {
log.trace(String.format(MESSAGE_NODE, UPDATE));
K8sNode existing = nodeAdminService.node(
nullIsIllegal(hostname, HOST_NAME + ERROR_MESSAGE));
if (existing == null) {
log.warn("There is no node configuration to offboard : {}", hostname);
return Response.notModified().build();
} else {
K8sNode updated = existing.updateState(OFF_BOARDED);
nodeAdminService.updateNode(updated);
}
return Response.ok().build();
}
/**
* Obtains the state of the kubernetes node.
*