Extend FPM module to handle routes from multiple peers.

This has the side-effect of fixing a bug when the same peer changes its
route advertisement for a particular prefix.

Change-Id: I09af3baf0a7741919be2a2986112db6db2556666
This commit is contained in:
Jonathan Hart 2017-05-02 16:36:26 -07:00
parent df5eeb13b3
commit b10f1e756c
6 changed files with 155 additions and 44 deletions

View File

@ -16,13 +16,17 @@
package org.onosproject.routing.fpm;
import java.net.SocketAddress;
import java.util.Map;
/**
* Created by jono on 2/2/16.
* Provides information about the FPM route receiver module.
*/
public interface FpmInfoService {
Map<SocketAddress, Long> peers();
/**
* Returns the FPM peers that are currently connected.
*
* @return a map of FPM peer to connection time.
*/
Map<FpmPeer, Long> peers();
}

View File

@ -18,8 +18,6 @@ package org.onosproject.routing.fpm;
import org.onosproject.routing.fpm.protocol.FpmHeader;
import java.net.SocketAddress;
/**
* Listener for events from the route source.
*/
@ -28,22 +26,23 @@ public interface FpmListener {
/**
* Handles an FPM message.
*
* @param peer FPM peer
* @param fpmMessage FPM message
*/
void fpmMessage(FpmHeader fpmMessage);
void fpmMessage(FpmPeer peer, FpmHeader fpmMessage);
/**
* Signifies that a new peer has attempted to initiate an FPM connection.
*
* @param address remote address of the peer
* @param peer FPM peer
* @return true if the connection should be admitted, otherwise false
*/
boolean peerConnected(SocketAddress address);
boolean peerConnected(FpmPeer peer);
/**
* Signifies that an FPM connection has been disconnected.
*
* @param address remote address of the peer
* @param peer FPM peer
*/
void peerDisconnected(SocketAddress address);
void peerDisconnected(FpmPeer peer);
}

View File

@ -16,7 +16,6 @@
package org.onosproject.routing.fpm;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@ -54,7 +53,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Dictionary;
import java.util.LinkedList;
import java.util.List;
@ -84,9 +82,9 @@ public class FpmManager implements FpmInfoService {
private Channel serverChannel;
private ChannelGroup allChannels = new DefaultChannelGroup();
private Map<SocketAddress, Long> peers = new ConcurrentHashMap<>();
private Map<FpmPeer, Long> peers = new ConcurrentHashMap<>();
private Map<IpPrefix, Route> fpmRoutes = new ConcurrentHashMap<>();
private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
@Property(name = "clearRoutes", boolValue = true,
label = "Whether to clear routes when the FPM connection goes down")
@ -171,11 +169,11 @@ public class FpmManager implements FpmInfoService {
}
if (clearRoutes) {
clearRoutes();
peers.keySet().forEach(this::clearRoutes);
}
}
private void fpmMessage(FpmHeader fpmMessage) {
private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
Netlink netlink = fpmMessage.netlink();
RtNetlink rtNetlink = netlink.rtNetlink();
@ -221,12 +219,17 @@ public class FpmManager implements FpmInfoService {
}
route = new Route(Route.Source.FPM, prefix, gateway);
fpmRoutes.put(prefix, route);
Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
if (oldRoute != null) {
log.trace("Swapping {} with {}", oldRoute, route);
withdraws.add(oldRoute);
}
updates.add(route);
break;
case RTM_DELROUTE:
Route existing = fpmRoutes.remove(prefix);
Route existing = fpmRoutes.get(peer).remove(prefix);
if (existing == null) {
log.warn("Got delete for non-existent prefix");
return;
@ -246,41 +249,45 @@ public class FpmManager implements FpmInfoService {
}
private void clearRoutes() {
log.info("Clearing all routes");
routeService.withdraw(ImmutableList.copyOf(fpmRoutes.values()));
private void clearRoutes(FpmPeer peer) {
log.info("Clearing all routes for peer {}", peer);
Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
if (routes != null) {
routeService.withdraw(routes.values());
}
}
@Override
public Map<SocketAddress, Long> peers() {
public Map<FpmPeer, Long> peers() {
return ImmutableMap.copyOf(peers);
}
private class InternalFpmListener implements FpmListener {
@Override
public void fpmMessage(FpmHeader fpmMessage) {
FpmManager.this.fpmMessage(fpmMessage);
public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
FpmManager.this.fpmMessage(peer, fpmMessage);
}
@Override
public boolean peerConnected(SocketAddress address) {
if (peers.keySet().contains(address)) {
public boolean peerConnected(FpmPeer peer) {
if (peers.keySet().contains(peer)) {
return false;
}
peers.put(address, System.currentTimeMillis());
peers.put(peer, System.currentTimeMillis());
fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
return true;
}
@Override
public void peerDisconnected(SocketAddress address) {
log.info("FPM connection to {} went down", address);
public void peerDisconnected(FpmPeer peer) {
log.info("FPM connection to {} went down", peer);
if (clearRoutes) {
clearRoutes();
clearRoutes(peer);
}
peers.remove(address);
peers.remove(peer);
}
}

View File

@ -0,0 +1,95 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.routing.fpm;
import org.onlab.packet.IpAddress;
import java.net.InetSocketAddress;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Represents an FPM peer.
*/
public class FpmPeer {
private final IpAddress address;
private final int port;
/**
* Creates a new FPM peer.
*
* @param address peer IP address
* @param port peer TCP port number
*/
public FpmPeer(IpAddress address, int port) {
this.address = address;
this.port = port;
}
/**
* Returns the peers IP address.
*
* @return IP address
*/
public IpAddress address() {
return address;
}
/**
* Returns the peer port number.
*
* @return port number
*/
public int port() {
return port;
}
@Override
public int hashCode() {
return Objects.hash(address, port);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof FpmPeer)) {
return false;
}
FpmPeer that = (FpmPeer) other;
return Objects.equals(this.address, that.address) &&
Objects.equals(this.port, that.port);
}
@Override
public String toString() {
return toStringHelper(this)
.add("address", address)
.add("port", port)
.toString();
}
public static FpmPeer fromSocketAddress(InetSocketAddress address) {
return new FpmPeer(IpAddress.valueOf(address.getAddress()), address.getPort());
}
}

View File

@ -26,6 +26,9 @@ import org.onosproject.routing.fpm.protocol.FpmHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import static com.google.common.base.Preconditions.checkNotNull;
/**
@ -38,6 +41,7 @@ public class FpmSessionHandler extends SimpleChannelHandler {
private final FpmListener fpmListener;
private Channel channel;
private FpmPeer us;
/**
* Class constructor.
@ -52,7 +56,7 @@ public class FpmSessionHandler extends SimpleChannelHandler {
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
FpmHeader fpmMessage = (FpmHeader) e.getMessage();
fpmListener.fpmMessage(fpmMessage);
fpmListener.fpmMessage(us, fpmMessage);
}
@Override
@ -68,7 +72,15 @@ public class FpmSessionHandler extends SimpleChannelHandler {
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (!fpmListener.peerConnected(ctx.getChannel().getRemoteAddress())) {
SocketAddress socketAddress = ctx.getChannel().getRemoteAddress();
if (!(socketAddress instanceof InetSocketAddress)) {
throw new IllegalStateException("Address type is not InetSocketAddress");
}
us = FpmPeer.fromSocketAddress((InetSocketAddress) socketAddress);
if (!fpmListener.peerConnected(us)) {
log.error("Received new FPM connection while already connected");
ctx.getChannel().close();
return;
@ -94,7 +106,9 @@ public class FpmSessionHandler extends SimpleChannelHandler {
}
private void handleDisconnect() {
fpmListener.peerDisconnected(channel.getRemoteAddress());
if (us != null) {
fpmListener.peerDisconnected(us);
}
channel = null;
}
}

View File

@ -21,8 +21,6 @@ import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.routing.fpm.FpmInfoService;
import java.net.InetSocketAddress;
/**
* Displays the current FPM connections.
*/
@ -36,14 +34,8 @@ public class FpmConnectionsList extends AbstractShellCommand {
protected void execute() {
FpmInfoService fpmInfo = AbstractShellCommand.get(FpmInfoService.class);
fpmInfo.peers().forEach((socketAddress, timestamp) -> {
if (socketAddress instanceof InetSocketAddress) {
InetSocketAddress inet = (InetSocketAddress) socketAddress;
print(FORMAT, inet.getHostString(), inet.getPort(), Tools.timeAgo(timestamp));
} else {
print("Unknown data format");
}
fpmInfo.peers().forEach((peer, timestamp) -> {
print(FORMAT, peer.address(), peer.port(), Tools.timeAgo(timestamp));
});
}
}