From 7bbe7b1850304e32af98709bc00cefd4dc89cf7e Mon Sep 17 00:00:00 2001 From: Andrea Campanella Date: Wed, 3 May 2017 16:03:38 -0700 Subject: [PATCH] [ONOS-6376] Netconf ssh connection through Apache Mina library Change-Id: If69fd89afe3082debc3c28a06debfed53426635c --- lib/BUCK | 8 +- lib/deps.json | 2 +- protocols/netconf/ctl/BUCK | 2 +- protocols/netconf/ctl/pom.xml | 3 +- .../ctl/impl/DefaultNetconfDevice.java | 34 +- .../ctl/impl/NetconfControllerImpl.java | 44 +- .../netconf/ctl/impl/NetconfSessionImpl.java | 23 +- .../ctl/impl/NetconfSessionMinaImpl.java | 838 ++++++++++++++++++ .../netconf/ctl/impl/NetconfStreamThread.java | 4 +- .../ctl/impl/NetconfControllerImplTest.java | 12 +- .../ctl/impl/NetconfSessionImplTest.java | 27 +- .../ctl/impl/NetconfSessionMinaImplTest.java | 472 ++++++++++ .../ctl/impl/NetconfSshdTestSubsystem.java | 13 +- providers/netconf/BUCK | 1 + .../device/impl/NetconfDeviceProvider.java | 2 +- 15 files changed, 1432 insertions(+), 53 deletions(-) create mode 100644 protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java create mode 100644 protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImplTest.java diff --git a/lib/BUCK b/lib/BUCK index 1a3d31448a..b5b73c4396 100644 --- a/lib/BUCK +++ b/lib/BUCK @@ -1272,10 +1272,10 @@ remote_jar ( remote_jar ( name = 'sshd-core', - out = 'sshd-core-0.14.0.jar', - url = 'mvn:org.apache.sshd:sshd-core:jar:0.14.0', - sha1 = 'cb12fa1b1b07fb5ce3aa4f99b189743897bd4fca', - maven_coords = 'org.apache.sshd:sshd-core:0.14.0', + out = 'sshd-core-1.4.0.jar', + url = 'mvn:org.apache.sshd:sshd-core:jar:1.4.0', + sha1 = 'c8f3d7457fc9979d1b9ec319f0229b89793c8e56', + maven_coords = 'org.apache.sshd:sshd-core:1.4.0', visibility = [ 'PUBLIC' ], ) diff --git a/lib/deps.json b/lib/deps.json index 420bc0af18..2e631e9a9b 100644 --- a/lib/deps.json +++ b/lib/deps.json @@ -225,6 +225,6 @@ "repo": "https://oss.sonatype.org/content/repositories/snapshots" }, "plexus-utils": "mvn:org.codehaus.plexus:plexus-utils:3.0.24", - "sshd-core": "mvn:org.apache.sshd:sshd-core:0.14.0" + "sshd-core": "mvn:org.apache.sshd:sshd-core:1.4.0" } } diff --git a/protocols/netconf/ctl/BUCK b/protocols/netconf/ctl/BUCK index 9ed9e0cbc3..dbd6e977e1 100644 --- a/protocols/netconf/ctl/BUCK +++ b/protocols/netconf/ctl/BUCK @@ -4,13 +4,13 @@ COMPILE_DEPS = [ '//protocols/netconf/api:onos-protocols-netconf-api', '//cli:onos-cli', '//lib:org.apache.karaf.shell.console', + '//lib:sshd-core', ] TEST_DEPS = [ '//lib:TEST_ADAPTERS', '//utils/osgi:onlab-osgi-tests', '//core/api:onos-api-tests', - '//lib:sshd-core' ] osgi_jar_with_tests ( diff --git a/protocols/netconf/ctl/pom.xml b/protocols/netconf/ctl/pom.xml index 40fa0a2ecf..d3241914b6 100644 --- a/protocols/netconf/ctl/pom.xml +++ b/protocols/netconf/ctl/pom.xml @@ -64,8 +64,7 @@ org.apache.sshd sshd-core - 0.14.0 - test + 1.4.0 diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java index c29b8954cf..3836f976f2 100644 --- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java +++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java @@ -36,7 +36,7 @@ public class DefaultNetconfDevice implements NetconfDevice { private NetconfDeviceInfo netconfDeviceInfo; private boolean deviceState = true; - protected NetconfSessionFactory sessionFactory = new SshNetconfSessionFactory(); + private final NetconfSessionFactory sessionFactory; private NetconfSession netconfSession; // will block until hello RPC handshake completes @@ -51,6 +51,31 @@ public class DefaultNetconfDevice implements NetconfDevice { public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo) throws NetconfException { netconfDeviceInfo = deviceInfo; + sessionFactory = new NetconfSessionMinaImpl.MinaSshNetconfSessionFactory(); + try { + netconfSession = sessionFactory.createNetconfSession(deviceInfo); + } catch (IOException e) { + deviceState = false; + throw new NetconfException("Cannot create connection and session for device " + + deviceInfo, e); + } + } + + // will block until hello RPC handshake completes + /** + * Creates a new default NETCONF device with the information provided. + * The device gets created only if no exception is thrown while connecting to + * it and establishing the NETCONF session. + * + * @param deviceInfo information about the device to be created. + * @param factory the factory used to create the session + * @throws NetconfException if there are problems in creating or establishing + * the underlying NETCONF connection and session. + */ + public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo, NetconfSessionFactory factory) + throws NetconfException { + netconfDeviceInfo = deviceInfo; + sessionFactory = factory; try { netconfSession = sessionFactory.createNetconfSession(deviceInfo); } catch (IOException e) { @@ -85,12 +110,5 @@ public class DefaultNetconfDevice implements NetconfDevice { return netconfDeviceInfo; } - public class SshNetconfSessionFactory implements NetconfSessionFactory { - - @Override - public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException { - return new NetconfSessionImpl(netconfDeviceInfo); - } - } } diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java index c03a6a4a3a..428c9d2c45 100644 --- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java +++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java @@ -65,6 +65,9 @@ import static org.onlab.util.Tools.groupedThreads; @Component(immediate = true) @Service public class NetconfControllerImpl implements NetconfController { + + private static final String ETHZ_SSH2 = "ethz-ssh2"; + private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 5; private static final String PROP_NETCONF_CONNECT_TIMEOUT = "netconfConnectTimeout"; @Property(name = PROP_NETCONF_CONNECT_TIMEOUT, intValue = DEFAULT_CONNECT_TIMEOUT_SECONDS, @@ -77,6 +80,12 @@ public class NetconfControllerImpl implements NetconfController { label = "Time (in seconds) waiting for a NetConf reply") protected static int netconfReplyTimeout = DEFAULT_REPLY_TIMEOUT_SECONDS; + private static final String SSH_LIBRARY = "sshLibrary"; + private static final String APACHE_MINA = "apache_mina"; + @Property(name = SSH_LIBRARY, value = APACHE_MINA, + label = "Ssh Llbrary instead of Apache Mina (i.e. ethz-ssh2") + protected static String sshLibrary = APACHE_MINA; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ComponentConfigService cfgService; @@ -98,7 +107,7 @@ public class NetconfControllerImpl implements NetconfController { protected final ExecutorService executor = Executors.newCachedThreadPool(groupedThreads("onos/netconfdevicecontroller", - "connection-reopen-%d", log)); + "connection-reopen-%d", log)); @Activate public void activate(ComponentContext context) { @@ -109,7 +118,12 @@ public class NetconfControllerImpl implements NetconfController { @Deactivate public void deactivate() { + netconfDeviceMap.values().forEach(device -> { + device.getSession().removeDeviceOutputListener(downListener); + device.disconnect(); + }); cfgService.unregisterProperties(getClass(), false); + netconfDeviceListeners.clear(); netconfDeviceMap.clear(); log.info("Stopped"); } @@ -119,6 +133,7 @@ public class NetconfControllerImpl implements NetconfController { if (context == null) { netconfReplyTimeout = DEFAULT_REPLY_TIMEOUT_SECONDS; netconfConnectTimeout = DEFAULT_CONNECT_TIMEOUT_SECONDS; + sshLibrary = APACHE_MINA; log.info("No component configuration"); return; } @@ -127,6 +142,7 @@ public class NetconfControllerImpl implements NetconfController { int newNetconfReplyTimeout; int newNetconfConnectTimeout; + String newSshLibrary; try { String s = get(properties, PROP_NETCONF_REPLY_TIMEOUT); newNetconfReplyTimeout = isNullOrEmpty(s) ? @@ -136,6 +152,8 @@ public class NetconfControllerImpl implements NetconfController { newNetconfConnectTimeout = isNullOrEmpty(s) ? netconfConnectTimeout : Integer.parseInt(s.trim()); + newSshLibrary = get(properties, SSH_LIBRARY); + } catch (NumberFormatException e) { log.warn("Component configuration had invalid value", e); return; @@ -151,8 +169,11 @@ public class NetconfControllerImpl implements NetconfController { netconfReplyTimeout = newNetconfReplyTimeout; netconfConnectTimeout = newNetconfConnectTimeout; - log.info("Settings: {} = {}, {} = {}", - PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout, PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout); + sshLibrary = newSshLibrary; + log.info("Settings: {} = {}, {} = {}, {} = {}", + PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout, + PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout, + SSH_LIBRARY, sshLibrary); } @Override @@ -302,7 +323,12 @@ public class NetconfControllerImpl implements NetconfController { private class DefaultNetconfDeviceFactory implements NetconfDeviceFactory { @Override - public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException { + public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo) + throws NetconfException { + if (sshLibrary.equals(ETHZ_SSH2)) { + return new DefaultNetconfDevice(netconfDeviceInfo, + new NetconfSessionImpl.SshNetconfSessionFactory()); + } return new DefaultNetconfDevice(netconfDeviceInfo); } } @@ -320,8 +346,14 @@ public class NetconfControllerImpl implements NetconfController { log.info("Trying to reestablish connection with device {}", did); executor.execute(() -> { try { - netconfDeviceMap.get(did).getSession().checkAndReestablish(); - log.info("Connection with device {} was reestablished", did); + NetconfDevice device = netconfDeviceMap.get(did); + if (device != null) { + device.getSession().checkAndReestablish(); + log.info("Connection with device {} was reestablished", did); + } else { + log.warn("The device {} is not in the system", did); + } + } catch (NetconfException e) { log.error("The SSH connection with device {} couldn't be " + "reestablished due to {}. " + diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java index c0fcce39bd..b8086125a8 100644 --- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java +++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java @@ -24,6 +24,7 @@ import ch.ethz.ssh2.channel.Channel; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import org.onosproject.netconf.NetconfSessionFactory; import org.onosproject.netconf.TargetConfig; import org.onosproject.netconf.FilteringNetconfDeviceOutputEventListener; import org.onosproject.netconf.NetconfDeviceInfo; @@ -104,7 +105,7 @@ public class NetconfSessionImpl implements NetconfSession { private static final Pattern SESSION_ID_REGEX_PATTERN = Pattern.compile(SESSION_ID_REGEX); private String sessionID; - private final AtomicInteger messageIdInteger = new AtomicInteger(0); + private final AtomicInteger messageIdInteger = new AtomicInteger(1); private Connection netconfConnection; protected final NetconfDeviceInfo deviceInfo; private Session sshSession; @@ -288,7 +289,7 @@ public class NetconfSessionImpl implements NetconfSession { } private void sendHello() throws NetconfException { - serverHelloResponseOld = sendRequest(createHelloString()); + serverHelloResponseOld = sendRequest(createHelloString(), true); Matcher capabilityMatcher = CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponseOld); while (capabilityMatcher.find()) { deviceCapabilities.add(capabilityMatcher.group(1)); @@ -332,7 +333,6 @@ public class NetconfSessionImpl implements NetconfSession { try { connectionActive = false; replies.clear(); - messageIdInteger.set(0); startConnection(); if (subscriptionConnected) { log.debug("Restarting subscription with {}", deviceInfo.getDeviceId()); @@ -368,8 +368,15 @@ public class NetconfSessionImpl implements NetconfSession { } private String sendRequest(String request) throws NetconfException { + return sendRequest(request, false); + } + + private String sendRequest(String request, boolean isHello) throws NetconfException { checkAndReestablish(); - final int messageId = messageIdInteger.getAndIncrement(); + int messageId = -1; + if (!isHello) { + messageId = messageIdInteger.getAndIncrement(); + } request = formatRequestMessageId(request, messageId); request = formatXmlHeader(request); CompletableFuture futureReply = request(request, messageId); @@ -784,4 +791,12 @@ public class NetconfSessionImpl implements NetconfSession { } } } + + public static class SshNetconfSessionFactory implements NetconfSessionFactory { + + @Override + public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException { + return new NetconfSessionImpl(netconfDeviceInfo); + } + } } diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java new file mode 100644 index 0000000000..cdbde456bd --- /dev/null +++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java @@ -0,0 +1,838 @@ +/* + * Copyright 2015-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.netconf.ctl.impl; + +import com.google.common.annotations.Beta; +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableSet; +import org.apache.sshd.client.SshClient; +import org.apache.sshd.client.channel.ClientChannel; +import org.apache.sshd.client.future.ConnectFuture; +import org.apache.sshd.client.future.OpenFuture; +import org.apache.sshd.client.session.ClientSession; +import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; +import org.onosproject.netconf.NetconfDeviceInfo; +import org.onosproject.netconf.NetconfDeviceOutputEvent; +import org.onosproject.netconf.NetconfDeviceOutputEvent.Type; +import org.onosproject.netconf.NetconfDeviceOutputEventListener; +import org.onosproject.netconf.NetconfException; +import org.onosproject.netconf.NetconfSession; +import org.onosproject.netconf.NetconfSessionFactory; +import org.onosproject.netconf.TargetConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; +import java.security.KeyFactory; +import java.security.KeyPair; +import java.security.NoSuchAlgorithmException; +import java.security.PublicKey; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.X509EncodedKeySpec; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Implementation of a NETCONF session to talk to a device. + */ +public class NetconfSessionMinaImpl implements NetconfSession { + + private static final Logger log = LoggerFactory + .getLogger(NetconfSessionMinaImpl.class); + + private static final String ENDPATTERN = "]]>]]>"; + private static final String MESSAGE_ID_STRING = "message-id"; + private static final String HELLO = ""; + private static final String EQUAL = "="; + private static final String NUMBER_BETWEEN_QUOTES_MATCHER = "\"+([0-9]+)+\""; + private static final String RPC_OPEN = ""; + private static final String SUBTREE_FILTER_CLOSE = ""; + private static final String EDIT_CONFIG_OPEN = ""; + private static final String EDIT_CONFIG_CLOSE = ""; + private static final String TARGET_OPEN = ""; + private static final String TARGET_CLOSE = ""; + private static final String CONFIG_OPEN = ""; + private static final String CONFIG_CLOSE = ""; + private static final String XML_HEADER = + ""; + private static final String NETCONF_BASE_NAMESPACE = + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\""; + private static final String NETCONF_WITH_DEFAULTS_NAMESPACE = + "xmlns=\"urn:ietf:params:xml:ns:yang:ietf-netconf-with-defaults\""; + private static final String SUBSCRIPTION_SUBTREE_FILTER_OPEN = + ""; + + private static final String INTERLEAVE_CAPABILITY_STRING = "urn:ietf:params:netconf:capability:interleave:1.0"; + + private static final String CAPABILITY_REGEX = "\\s*(.*?)\\s*"; + private static final Pattern CAPABILITY_REGEX_PATTERN = Pattern.compile(CAPABILITY_REGEX); + + private static final String SESSION_ID_REGEX = "\\s*(.*?)\\s*"; + private static final Pattern SESSION_ID_REGEX_PATTERN = Pattern.compile(SESSION_ID_REGEX); + private static final String RSA = "RSA"; + private static final String DSA = "DSA"; + + private String sessionID; + private final AtomicInteger messageIdInteger = new AtomicInteger(1); + protected final NetconfDeviceInfo deviceInfo; + private Iterable onosCapabilities = + Collections.singletonList("urn:ietf:params:netconf:base:1.0"); + + /* NOTE: the "serverHelloResponseOld" is deprecated in 1.10.0 and should eventually be removed */ + @Deprecated + private String serverHelloResponseOld; + private final Set deviceCapabilities = new LinkedHashSet<>(); + private NetconfStreamHandler streamHandler; + private Map> replies; + private List errorReplies; + private boolean subscriptionConnected = false; + private String notificationFilterSchema = null; + + private final Collection primaryListeners = + new CopyOnWriteArrayList<>(); + private final Collection children = + new CopyOnWriteArrayList<>(); + + + private ClientChannel channel = null; + private ClientSession session = null; + private SshClient client = null; + + + public NetconfSessionMinaImpl(NetconfDeviceInfo deviceInfo) throws NetconfException { + this.deviceInfo = deviceInfo; + replies = new ConcurrentHashMap<>(); + errorReplies = new ArrayList<>(); + startConnection(); + } + + private void startConnection() throws NetconfException { + try { + startClient(); + } catch (IOException e) { + throw new NetconfException("Failed to establish SSH with device " + deviceInfo, e); + } + } + + private void startClient() throws IOException { + client = SshClient.setUpDefaultClient(); + client.start(); + client.setKeyPairProvider(new SimpleGeneratorHostKeyProvider()); + startSession(); + } + + private void startSession() throws IOException { + final ConnectFuture connectFuture; + connectFuture = client.connect(deviceInfo.name(), + deviceInfo.ip().toString(), + deviceInfo.port()) + .verify(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS); + session = connectFuture.getSession(); + //Using the device ssh key if possible + if (deviceInfo.getKey() != null) { + ByteBuffer buf = StandardCharsets.UTF_8.encode(CharBuffer.wrap(deviceInfo.getKey())); + byte[] byteKey = new byte[buf.limit()]; + buf.get(byteKey); + PublicKey key; + try { + key = getPublicKey(byteKey, RSA); + } catch (NoSuchAlgorithmException | InvalidKeySpecException e) { + try { + key = getPublicKey(byteKey, DSA); + } catch (NoSuchAlgorithmException | InvalidKeySpecException e1) { + throw new NetconfException("Failed to authenticate session with device " + + deviceInfo + "check key to be the " + + "proper DSA or RSA key", e1); + } + } + //privateKye can set tu null because is not used by the method. + session.addPublicKeyIdentity(new KeyPair(key, null)); + } else { + session.addPasswordIdentity(deviceInfo.password()); + } + session.auth().verify(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS); + Set event = session.waitFor( + ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH, + ClientSession.ClientSessionEvent.CLOSED, + ClientSession.ClientSessionEvent.AUTHED), 0); + + if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) { + log.debug("Session closed {} {}", event, session.isClosed()); + throw new NetconfException("Failed to authenticate session with device " + + deviceInfo + "check the user/pwd or key"); + } + openChannel(); + } + + private PublicKey getPublicKey(byte[] keyBytes, String type) + throws NoSuchAlgorithmException, InvalidKeySpecException { + + X509EncodedKeySpec spec = + new X509EncodedKeySpec(keyBytes); + KeyFactory kf = KeyFactory.getInstance(type); + return kf.generatePublic(spec); + } + + private void openChannel() throws IOException { + channel = session.createSubsystemChannel("netconf"); + OpenFuture channelFuture = channel.open(); + if (channelFuture.await(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS)) { + if (channelFuture.isOpened()) { + streamHandler = new NetconfStreamThread(channel.getInvertedOut(), channel.getInvertedIn(), + channel.getInvertedErr(), deviceInfo, + new NetconfSessionDelegateImpl(), replies); + } else { + throw new NetconfException("Failed to open channel with device " + + deviceInfo); + } + sendHello(); + } + } + + + @Beta + protected void startSubscriptionStream(String filterSchema) throws NetconfException { + boolean openNewSession = false; + if (!deviceCapabilities.contains(INTERLEAVE_CAPABILITY_STRING)) { + log.info("Device {} doesn't support interleave, creating child session", deviceInfo); + openNewSession = true; + + } else if (subscriptionConnected && + notificationFilterSchema != null && + !Objects.equal(filterSchema, notificationFilterSchema)) { + // interleave supported and existing filter is NOT "no filtering" + // and was requested with different filtering schema + log.info("Cannot use existing session for subscription {} ({})", + deviceInfo, filterSchema); + openNewSession = true; + } + + if (openNewSession) { + log.info("Creating notification session to {} with filter {}", + deviceInfo, filterSchema); + NetconfSession child = new NotificationSession(deviceInfo); + + child.addDeviceOutputListener(new NotificationForwarder()); + + child.startSubscription(filterSchema); + children.add(child); + return; + } + + // request to start interleaved notification session + String reply = sendRequest(createSubscriptionString(filterSchema)); + if (!checkReply(reply)) { + throw new NetconfException("Subscription not successful with device " + + deviceInfo + " with reply " + reply); + } + subscriptionConnected = true; + } + + @Override + public void startSubscription() throws NetconfException { + if (!subscriptionConnected) { + startSubscriptionStream(null); + } + streamHandler.setEnableNotifications(true); + } + + @Beta + @Override + public void startSubscription(String filterSchema) throws NetconfException { + if (!subscriptionConnected) { + notificationFilterSchema = filterSchema; + startSubscriptionStream(filterSchema); + } + streamHandler.setEnableNotifications(true); + } + + @Beta + protected String createSubscriptionString(String filterSchema) { + StringBuilder subscriptionbuffer = new StringBuilder(); + subscriptionbuffer.append("\n"); + subscriptionbuffer.append(" \n"); + // FIXME Only subtree filtering supported at the moment. + if (filterSchema != null) { + subscriptionbuffer.append(" "); + subscriptionbuffer.append(SUBSCRIPTION_SUBTREE_FILTER_OPEN).append(NEW_LINE); + subscriptionbuffer.append(filterSchema).append(NEW_LINE); + subscriptionbuffer.append(" "); + subscriptionbuffer.append(SUBTREE_FILTER_CLOSE).append(NEW_LINE); + } + subscriptionbuffer.append(" \n"); + subscriptionbuffer.append("\n"); + subscriptionbuffer.append(ENDPATTERN); + return subscriptionbuffer.toString(); + } + + @Override + public void endSubscription() throws NetconfException { + if (subscriptionConnected) { + streamHandler.setEnableNotifications(false); + } else { + throw new NetconfException("Subscription does not exist."); + } + } + + private void sendHello() throws NetconfException { + serverHelloResponseOld = sendRequest(createHelloString(), true); + Matcher capabilityMatcher = CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponseOld); + while (capabilityMatcher.find()) { + deviceCapabilities.add(capabilityMatcher.group(1)); + } + sessionID = String.valueOf(-1); + Matcher sessionIDMatcher = SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponseOld); + if (sessionIDMatcher.find()) { + sessionID = sessionIDMatcher.group(1); + } else { + throw new NetconfException("Missing SessionID in server hello " + + "reponse."); + } + + } + + private String createHelloString() { + StringBuilder hellobuffer = new StringBuilder(); + hellobuffer.append(XML_HEADER); + hellobuffer.append("\n"); + hellobuffer.append("\n"); + hellobuffer.append(" \n"); + onosCapabilities.forEach( + cap -> hellobuffer.append(" ") + .append(cap) + .append("\n")); + hellobuffer.append(" \n"); + hellobuffer.append("\n"); + hellobuffer.append(ENDPATTERN); + return hellobuffer.toString(); + + } + + @Override + public void checkAndReestablish() throws NetconfException { + try { + if (client.isClosed()) { + log.debug("Trying to restart the whole SSH connection with {}", deviceInfo.getDeviceId()); + cleanUp(); + startConnection(); + } else if (session.isClosed()) { + log.debug("Trying to restart the session with {}", session, deviceInfo.getDeviceId()); + cleanUp(); + startSession(); + } else if (channel.isClosed()) { + log.debug("Trying to reopen the channel with {}", deviceInfo.getDeviceId()); + cleanUp(); + openChannel(); + } + if (subscriptionConnected) { + log.debug("Restarting subscription with {}", deviceInfo.getDeviceId()); + subscriptionConnected = false; + startSubscription(notificationFilterSchema); + } + } catch (IOException e) { + log.error("Can't reopen connection for device {}", e.getMessage()); + throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e); + } + } + + private void cleanUp() { + //makes sure everything is at a clean state. + replies.clear(); + } + + @Override + public String requestSync(String request) throws NetconfException { + if (!request.contains(ENDPATTERN)) { + request = request + NEW_LINE + ENDPATTERN; + } + String reply = sendRequest(request); + checkReply(reply); + return reply; + } + + @Override + @Deprecated + public CompletableFuture request(String request) { + return streamHandler.sendMessage(request); + } + + private CompletableFuture request(String request, int messageId) { + return streamHandler.sendMessage(request, messageId); + } + + private String sendRequest(String request) throws NetconfException { + return sendRequest(request, false); + } + + private String sendRequest(String request, boolean isHello) throws NetconfException { + checkAndReestablish(); + int messageId = -1; + if (!isHello) { + messageId = messageIdInteger.getAndIncrement(); + } + request = formatRequestMessageId(request, messageId); + request = formatXmlHeader(request); + CompletableFuture futureReply = request(request, messageId); + int replyTimeout = NetconfControllerImpl.netconfReplyTimeout; + String rp; + try { + rp = futureReply.get(replyTimeout, TimeUnit.SECONDS); + replies.remove(messageId); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new NetconfException("No matching reply for request " + request, e); + } + log.debug("Result {} from request {} to device {}", rp, request, deviceInfo); + return rp.trim(); + } + + private String formatRequestMessageId(String request, int messageId) { + if (request.contains(MESSAGE_ID_STRING)) { + //FIXME if application provides his own counting of messages this fails that count + request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER, + MESSAGE_ID_STRING + EQUAL + "\"" + messageId + "\""); + } else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) { + //FIXME find out a better way to enforce the presence of message-id + request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\"" + + messageId + "\"" + ">"); + } + return request; + } + + private String formatXmlHeader(String request) { + if (!request.contains(XML_HEADER)) { + //FIXME if application provieds his own XML header of different type there is a clash + request = XML_HEADER + "\n" + request; + } + return request; + } + + @Override + public String doWrappedRpc(String request) throws NetconfException { + StringBuilder rpc = new StringBuilder(XML_HEADER); + rpc.append(RPC_OPEN); + rpc.append(MESSAGE_ID_STRING); + rpc.append(EQUAL); + rpc.append("\""); + rpc.append(messageIdInteger.get()); + rpc.append("\" "); + rpc.append(NETCONF_BASE_NAMESPACE).append(">\n"); + rpc.append(request); + rpc.append(RPC_CLOSE).append(NEW_LINE); + rpc.append(ENDPATTERN); + String reply = sendRequest(rpc.toString()); + checkReply(reply); + return reply; + } + + @Override + public String get(String request) throws NetconfException { + return requestSync(request); + } + + @Override + public String get(String filterSchema, String withDefaultsMode) throws NetconfException { + StringBuilder rpc = new StringBuilder(XML_HEADER); + rpc.append(RPC_OPEN); + rpc.append(MESSAGE_ID_STRING); + rpc.append(EQUAL); + rpc.append("\""); + rpc.append(messageIdInteger.get()); + rpc.append("\" "); + rpc.append(NETCONF_BASE_NAMESPACE).append(">\n"); + rpc.append(GET_OPEN).append(NEW_LINE); + if (filterSchema != null) { + rpc.append(SUBTREE_FILTER_OPEN).append(NEW_LINE); + rpc.append(filterSchema).append(NEW_LINE); + rpc.append(SUBTREE_FILTER_CLOSE).append(NEW_LINE); + } + if (withDefaultsMode != null) { + rpc.append(WITH_DEFAULT_OPEN).append(NETCONF_WITH_DEFAULTS_NAMESPACE).append(">"); + rpc.append(withDefaultsMode).append(WITH_DEFAULT_CLOSE).append(NEW_LINE); + } + rpc.append(GET_CLOSE).append(NEW_LINE); + rpc.append(RPC_CLOSE).append(NEW_LINE); + rpc.append(ENDPATTERN); + String reply = sendRequest(rpc.toString()); + checkReply(reply); + return reply; + } + + @Override + public String getConfig(TargetConfig netconfTargetConfig) throws NetconfException { + return getConfig(netconfTargetConfig, null); + } + + @Override + public String getConfig(String netconfTargetConfig) throws NetconfException { + return getConfig(TargetConfig.toTargetConfig(netconfTargetConfig)); + } + + @Override + public String getConfig(String netconfTargetConfig, String configurationFilterSchema) throws NetconfException { + return getConfig(TargetConfig.toTargetConfig(netconfTargetConfig), configurationFilterSchema); + } + + @Override + public String getConfig(TargetConfig netconfTargetConfig, String configurationSchema) throws NetconfException { + StringBuilder rpc = new StringBuilder(XML_HEADER); + rpc.append("\n"); + rpc.append("\n"); + rpc.append("\n"); + rpc.append("<").append(netconfTargetConfig).append("/>"); + rpc.append(""); + if (configurationSchema != null) { + rpc.append("\n"); + rpc.append(configurationSchema).append("\n"); + rpc.append("\n"); + } + rpc.append("\n"); + rpc.append("\n"); + rpc.append(ENDPATTERN); + String reply = sendRequest(rpc.toString()); + return checkReply(reply) ? reply : "ERROR " + reply; + } + + @Override + public boolean editConfig(String newConfiguration) throws NetconfException { + newConfiguration = newConfiguration + ENDPATTERN; + return checkReply(sendRequest(newConfiguration)); + } + + @Override + public boolean editConfig(String netconfTargetConfig, String mode, String newConfiguration) + throws NetconfException { + return editConfig(TargetConfig.toTargetConfig(netconfTargetConfig), mode, newConfiguration); + } + + @Override + public boolean editConfig(TargetConfig netconfTargetConfig, String mode, String newConfiguration) + throws NetconfException { + newConfiguration = newConfiguration.trim(); + StringBuilder rpc = new StringBuilder(XML_HEADER); + rpc.append(RPC_OPEN); + rpc.append(MESSAGE_ID_STRING); + rpc.append(EQUAL); + rpc.append("\""); + rpc.append(messageIdInteger.get()); + rpc.append("\" "); + rpc.append(NETCONF_BASE_NAMESPACE).append(">\n"); + rpc.append(EDIT_CONFIG_OPEN).append("\n"); + rpc.append(TARGET_OPEN); + rpc.append("<").append(netconfTargetConfig).append("/>"); + rpc.append(TARGET_CLOSE).append("\n"); + if (mode != null) { + rpc.append(DEFAULT_OPERATION_OPEN); + rpc.append(mode); + rpc.append(DEFAULT_OPERATION_CLOSE).append("\n"); + } + rpc.append(CONFIG_OPEN).append("\n"); + rpc.append(newConfiguration); + rpc.append(CONFIG_CLOSE).append("\n"); + rpc.append(EDIT_CONFIG_CLOSE).append("\n"); + rpc.append(RPC_CLOSE); + rpc.append(ENDPATTERN); + log.debug(rpc.toString()); + String reply = sendRequest(rpc.toString()); + return checkReply(reply); + } + + @Override + public boolean copyConfig(String netconfTargetConfig, String newConfiguration) throws NetconfException { + return copyConfig(TargetConfig.toTargetConfig(netconfTargetConfig), newConfiguration); + } + + @Override + public boolean copyConfig(TargetConfig netconfTargetConfig, String newConfiguration) + throws NetconfException { + newConfiguration = newConfiguration.trim(); + if (!newConfiguration.startsWith("")) { + newConfiguration = "" + newConfiguration + + ""; + } + StringBuilder rpc = new StringBuilder(XML_HEADER); + rpc.append(RPC_OPEN); + rpc.append(NETCONF_BASE_NAMESPACE).append(">\n"); + rpc.append(""); + rpc.append(""); + rpc.append("<").append(netconfTargetConfig).append("/>"); + rpc.append(""); + rpc.append(""); + rpc.append(newConfiguration); + rpc.append(""); + rpc.append(""); + rpc.append(""); + rpc.append(ENDPATTERN); + return checkReply(sendRequest(rpc.toString())); + } + + @Override + public boolean deleteConfig(String netconfTargetConfig) throws NetconfException { + return deleteConfig(TargetConfig.toTargetConfig(netconfTargetConfig)); + } + + @Override + public boolean deleteConfig(TargetConfig netconfTargetConfig) throws NetconfException { + if (netconfTargetConfig.equals(TargetConfig.RUNNING)) { + log.warn("Target configuration for delete operation can't be \"running\"", + netconfTargetConfig); + return false; + } + StringBuilder rpc = new StringBuilder(XML_HEADER); + rpc.append(""); + rpc.append(""); + rpc.append(""); + rpc.append("<").append(netconfTargetConfig).append("/>"); + rpc.append(""); + rpc.append(""); + rpc.append(""); + rpc.append(ENDPATTERN); + return checkReply(sendRequest(rpc.toString())); + } + + @Override + public boolean lock(String configType) throws NetconfException { + StringBuilder rpc = new StringBuilder(XML_HEADER); + rpc.append("\n"); + rpc.append(""); + rpc.append(""); + rpc.append("<"); + rpc.append(configType); + rpc.append("/>"); + rpc.append(""); + rpc.append(""); + rpc.append(""); + rpc.append(ENDPATTERN); + String lockReply = sendRequest(rpc.toString()); + return checkReply(lockReply); + } + + @Override + public boolean unlock(String configType) throws NetconfException { + StringBuilder rpc = new StringBuilder(XML_HEADER); + rpc.append("\n"); + rpc.append(""); + rpc.append(""); + rpc.append("<"); + rpc.append(configType); + rpc.append("/>"); + rpc.append(""); + rpc.append(""); + rpc.append(""); + rpc.append(ENDPATTERN); + String unlockReply = sendRequest(rpc.toString()); + return checkReply(unlockReply); + } + + @Override + public boolean lock() throws NetconfException { + return lock("running"); + } + + @Override + public boolean unlock() throws NetconfException { + return unlock("running"); + } + + @Override + public boolean close() throws NetconfException { + return close(false); + } + + private boolean close(boolean force) throws NetconfException { + StringBuilder rpc = new StringBuilder(); + rpc.append(""); + if (force) { + rpc.append(""); + } else { + rpc.append(""); + } + rpc.append(""); + rpc.append(ENDPATTERN); + return checkReply(sendRequest(rpc.toString())) || close(true); + } + + @Override + public String getSessionId() { + return sessionID; + } + + @Override + public Set getDeviceCapabilitiesSet() { + return Collections.unmodifiableSet(deviceCapabilities); + } + + @Deprecated + @Override + public String getServerCapabilities() { + return serverHelloResponseOld; + } + + @Deprecated + @Override + public void setDeviceCapabilities(List capabilities) { + onosCapabilities = capabilities; + } + + @Override + public void setOnosCapabilities(Iterable capabilities) { + onosCapabilities = capabilities; + } + + + @Override + public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) { + streamHandler.addDeviceEventListener(listener); + primaryListeners.add(listener); + } + + @Override + public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) { + primaryListeners.remove(listener); + streamHandler.removeDeviceEventListener(listener); + } + + private boolean checkReply(String reply) throws NetconfException { + if (reply != null) { + if (!reply.contains("")) { + log.debug("Device {} sent reply {}", deviceInfo, reply); + return true; + } else if (reply.contains("") + || (reply.contains("") + && reply.contains("warning"))) { + log.debug("Device {} sent reply {}", deviceInfo, reply); + return true; + } + } + log.warn("Device {} has error in reply {}", deviceInfo, reply); + return false; + } + + static class NotificationSession extends NetconfSessionMinaImpl { + + private String notificationFilter; + + NotificationSession(NetconfDeviceInfo deviceInfo) + throws NetconfException { + super(deviceInfo); + } + + @Override + protected void startSubscriptionStream(String filterSchema) + throws NetconfException { + + notificationFilter = filterSchema; + requestSync(createSubscriptionString(filterSchema)); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("deviceInfo", deviceInfo) + .add("sessionID", getSessionId()) + .add("notificationFilter", notificationFilter) + .toString(); + } + } + + /** + * Listener attached to child session for notification streaming. + *

+ * Forwards all notification event from child session to primary session + * listeners. + */ + private final class NotificationForwarder + implements NetconfDeviceOutputEventListener { + + @Override + public boolean isRelevant(NetconfDeviceOutputEvent event) { + return event.type() == Type.DEVICE_NOTIFICATION; + } + + @Override + public void event(NetconfDeviceOutputEvent event) { + primaryListeners.forEach(lsnr -> { + if (lsnr.isRelevant(event)) { + lsnr.event(event); + } + }); + } + } + + public class NetconfSessionDelegateImpl implements NetconfSessionDelegate { + + @Override + public void notify(NetconfDeviceOutputEvent event) { + Optional messageId = event.getMessageID(); + log.debug("messageID {}, waiting replies messageIDs {}", messageId, + replies.keySet()); + if (!messageId.isPresent()) { + errorReplies.add(event.getMessagePayload()); + log.error("Device {} sent error reply {}", + event.getDeviceInfo(), event.getMessagePayload()); + return; + } + CompletableFuture completedReply = + replies.get(messageId.get()); + if (completedReply != null) { + completedReply.complete(event.getMessagePayload()); + } + } + } + + public static class MinaSshNetconfSessionFactory implements NetconfSessionFactory { + + @Override + public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException { + return new NetconfSessionMinaImpl(netconfDeviceInfo); + } + } +} diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java index 212a4e68ae..44206e3e22 100644 --- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java +++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java @@ -203,7 +203,7 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler netconfDeviceInfo, deviceReply); NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent( NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED, - null, null, Optional.of(-1), netconfDeviceInfo); + null, null, Optional.of(-2), netconfDeviceInfo); netconfDeviceEventListeners.forEach( listener -> listener.event(event)); this.interrupt(); @@ -256,7 +256,7 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler return Optional.of(messageId); } if (reply.contains(HELLO)) { - return Optional.of(0); + return Optional.of(-1); } return Optional.empty(); } diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java index 8287e6d785..666b0ad43a 100644 --- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java +++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java @@ -140,12 +140,13 @@ public class NetconfControllerImplTest { assertEquals("Incorrect NetConf connect timeout, should be default", 5, ctrl.netconfConnectTimeout); assertEquals("Incorrect NetConf reply timeout, should be default", - 5, ctrl.netconfReplyTimeout); + 5, ctrl.netconfReplyTimeout); ctrl.activate(null); assertEquals("Incorrect NetConf connect timeout, should be default", 5, ctrl.netconfConnectTimeout); assertEquals("Incorrect NetConf reply timeout, should be default", - 5, ctrl.netconfReplyTimeout); } + 5, ctrl.netconfReplyTimeout); + } /** * Test modification of component configuration. @@ -153,14 +154,15 @@ public class NetconfControllerImplTest { @Test public void testModified() { assertEquals("Incorrect NetConf connect timeout, should be default", - 5, ctrl.netconfConnectTimeout); + 5, ctrl.netconfConnectTimeout); assertEquals("Incorrect NetConf session timeout, should be default", 5, ctrl.netconfReplyTimeout); ctrl.modified(context); assertEquals("Incorrect NetConf connect timeout, should be default", - 2, ctrl.netconfConnectTimeout); + 2, ctrl.netconfConnectTimeout); assertEquals("Incorrect NetConf session timeout", 1, ctrl.netconfReplyTimeout); + assertEquals("ethz-ssh2", ctrl.sshLibrary); } /** @@ -398,6 +400,8 @@ public class NetconfControllerImplTest { return "2"; } else if (key.equals("netconfReplyTimeout")) { return "1"; + } else if (key.equals("sshLibrary")) { + return "ethz-ssh2"; } return null; } diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionImplTest.java index 27223b80d4..846642cd19 100644 --- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionImplTest.java +++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionImplTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertFalse; import static org.onosproject.netconf.TargetConfig.RUNNING; import static org.onosproject.netconf.TargetConfig.CANDIDATE; -import java.util.ArrayList; +import java.io.File; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -35,12 +35,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.regex.Pattern; -import org.apache.sshd.SshServer; import org.apache.sshd.common.NamedFactory; import org.apache.sshd.server.Command; -import org.apache.sshd.server.PasswordAuthenticator; -import org.apache.sshd.server.UserAuth; -import org.apache.sshd.server.auth.UserAuthPassword; +import org.apache.sshd.server.SshServer; +import org.apache.sshd.server.auth.password.PasswordAuthenticator; import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; import org.apache.sshd.server.session.ServerSession; import org.junit.AfterClass; @@ -59,7 +57,7 @@ import com.google.common.collect.ImmutableList; /** * Unit tests for NetconfSession. - * + *

* Sets up an SSH Server with Apache SSHD and connects to it using 2 clients * Truly verifies that the NETCONF flows are compliant with a NETCONF server. */ @@ -77,8 +75,8 @@ public class NetconfSessionImplTest { private static final String SAMPLE_REQUEST = "" - + "" - + ""; + + "" + + ""; private static final String EDIT_CONFIG_REQUEST = "> userAuthFactories = new ArrayList<>(); - userAuthFactories.add(new UserAuthPassword.Factory()); - sshServerNetconf.setUserAuthFactories(userAuthFactories); sshServerNetconf.setPasswordAuthenticator( new PasswordAuthenticator() { @Override @@ -126,7 +121,9 @@ public class NetconfSessionImplTest { } }); sshServerNetconf.setPort(PORT_NUMBER); - sshServerNetconf.setKeyPairProvider(new SimpleGeneratorHostKeyProvider(TEST_SERFILE)); + SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider(); + provider.setFile(new File(TEST_SERFILE)); + sshServerNetconf.setKeyPairProvider(provider); sshServerNetconf.setSubsystemFactories( Arrays.>asList(new NetconfSshdTestSubsystem.Factory())); sshServerNetconf.open(); @@ -224,10 +221,10 @@ public class NetconfSessionImplTest { assertNotNull("Incorrect sessionId", session1.getSessionId()); try { assertTrue("NETCONF get-config running command failed. ", - GET_REPLY_PATTERN.matcher(session1.getConfig(RUNNING, SAMPLE_REQUEST)).matches()); + GET_REPLY_PATTERN.matcher(session1.getConfig(RUNNING, SAMPLE_REQUEST)).matches()); assertTrue("NETCONF get-config candidate command failed. ", - GET_REPLY_PATTERN.matcher(session1.getConfig(CANDIDATE, SAMPLE_REQUEST)).matches()); + GET_REPLY_PATTERN.matcher(session1.getConfig(CANDIDATE, SAMPLE_REQUEST)).matches()); } catch (NetconfException e) { e.printStackTrace(); @@ -242,7 +239,7 @@ public class NetconfSessionImplTest { assertNotNull("Incorrect sessionId", session1.getSessionId()); try { assertTrue("NETCONF get running command failed. ", - GET_REPLY_PATTERN.matcher(session1.get(SAMPLE_REQUEST, null)).matches()); + GET_REPLY_PATTERN.matcher(session1.get(SAMPLE_REQUEST, null)).matches()); } catch (NetconfException e) { e.printStackTrace(); diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImplTest.java new file mode 100644 index 0000000000..d8c1202f3d --- /dev/null +++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImplTest.java @@ -0,0 +1,472 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.netconf.ctl.impl; + +import com.google.common.collect.ImmutableList; +import org.apache.sshd.common.NamedFactory; +import org.apache.sshd.server.Command; +import org.apache.sshd.server.SshServer; +import org.apache.sshd.server.auth.password.PasswordAuthenticator; +import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; +import org.apache.sshd.server.session.ServerSession; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onlab.junit.TestTools; +import org.onlab.packet.Ip4Address; +import org.onosproject.netconf.NetconfDeviceInfo; +import org.onosproject.netconf.NetconfException; +import org.onosproject.netconf.NetconfSession; +import org.onosproject.netconf.TargetConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.regex.Pattern; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.*; +import static org.onosproject.netconf.TargetConfig.CANDIDATE; +import static org.onosproject.netconf.TargetConfig.RUNNING; + +/** + * Unit tests for NetconfSession. + *

+ * Sets up an SSH Server with Apache SSHD and connects to it using 2 clients + * Truly verifies that the NETCONF flows are compliant with a NETCONF server. + */ +public class NetconfSessionMinaImplTest { + private static final Logger log = LoggerFactory + .getLogger(NetconfStreamThread.class); + + private static final int PORT_NUMBER = TestTools.findAvailablePort(50830); + private static final String TEST_USERNAME = "netconf"; + private static final String TEST_PASSWORD = "netconf123"; + private static final String TEST_HOSTNAME = "127.0.0.1"; + + private static final String TEST_SERFILE = + System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") + "testkey.ser"; + + private static final String SAMPLE_REQUEST = + "" + + "" + + ""; + + private static final String EDIT_CONFIG_REQUEST = + "\n" + + "\n" + + "\n" + + "\n" + + "" + + "\n" + + "\n" + + "]]>]]>"; + + static final List DEFAULT_CAPABILITIES = ImmutableList.builder() + .add("urn:ietf:params:netconf:base:1.0") + .add("urn:ietf:params:netconf:base:1.1") + .add("urn:ietf:params:netconf:capability:writable-running:1.0") + .add("urn:ietf:params:netconf:capability:candidate:1.0") + .add("urn:ietf:params:netconf:capability:startup:1.0") + .add("urn:ietf:params:netconf:capability:rollback-on-error:1.0") + .add("urn:ietf:params:netconf:capability:interleave:1.0") + .add("urn:ietf:params:netconf:capability:notification:1.0") + .add("urn:ietf:params:netconf:capability:validate:1.0") + .add("urn:ietf:params:netconf:capability:validate:1.1") + .build(); + + + private static NetconfSession session1; + private static NetconfSession session2; + private static SshServer sshServerNetconf; + + @BeforeClass + public static void setUp() throws Exception { + sshServerNetconf = SshServer.setUpDefaultServer(); + sshServerNetconf.setPasswordAuthenticator( + new PasswordAuthenticator() { + @Override + public boolean authenticate( + String username, + String password, + ServerSession session) { + return TEST_USERNAME.equals(username) && TEST_PASSWORD.equals(password); + } + }); + sshServerNetconf.setPort(PORT_NUMBER); + SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider(); + provider.setFile(new File(TEST_SERFILE)); + sshServerNetconf.setKeyPairProvider(provider); + sshServerNetconf.setSubsystemFactories( + Arrays.>asList(new NetconfSshdTestSubsystem.Factory())); + sshServerNetconf.open(); + log.info("SSH Server opened on port {}", PORT_NUMBER); + + NetconfDeviceInfo deviceInfo = new NetconfDeviceInfo( + TEST_USERNAME, TEST_PASSWORD, Ip4Address.valueOf(TEST_HOSTNAME), PORT_NUMBER); + + session1 = new NetconfSessionMinaImpl(deviceInfo); + log.info("Started NETCONF Session {} with test SSHD server in Unit Test", session1.getSessionId()); + assertTrue("Incorrect sessionId", !session1.getSessionId().equalsIgnoreCase("-1")); + assertTrue("Incorrect sessionId", !session1.getSessionId().equalsIgnoreCase("0")); + assertThat(session1.getDeviceCapabilitiesSet(), containsInAnyOrder(DEFAULT_CAPABILITIES.toArray())); + + session2 = new NetconfSessionMinaImpl(deviceInfo); + log.info("Started NETCONF Session {} with test SSHD server in Unit Test", session2.getSessionId()); + assertTrue("Incorrect sessionId", !session2.getSessionId().equalsIgnoreCase("-1")); + assertTrue("Incorrect sessionId", !session2.getSessionId().equalsIgnoreCase("0")); + assertThat(session2.getDeviceCapabilitiesSet(), containsInAnyOrder(DEFAULT_CAPABILITIES.toArray())); + } + + @AfterClass + public static void tearDown() throws Exception { + if (session1 != null) { + session1.close(); + } + if (session2 != null) { + session2.close(); + } + + sshServerNetconf.stop(); + } + + @Test + public void testEditConfigRequest() { + log.info("Starting edit-config async"); + assertNotNull("Incorrect sessionId", session1.getSessionId()); + try { + assertTrue("NETCONF edit-config command failed", + session1.editConfig(TargetConfig.RUNNING.toString(), + null, SAMPLE_REQUEST)); + } catch (NetconfException e) { + e.printStackTrace(); + fail("NETCONF edit-config test failed: " + e.getMessage()); + } + log.info("Finishing edit-config async"); + } + + @Test + public void testEditConfigRequestWithOnlyNewConfiguration() { + log.info("Starting edit-config async"); + assertNotNull("Incorrect sessionId", session1.getSessionId()); + try { + assertTrue("NETCONF edit-config command failed", + session1.editConfig(EDIT_CONFIG_REQUEST)); + } catch (NetconfException e) { + e.printStackTrace(); + fail("NETCONF edit-config test failed: " + e.getMessage()); + } + log.info("Finishing edit-config async"); + } + + @Test + public void testDeleteConfigRequestWithRunningTargetConfiguration() { + log.info("Starting delete-config async"); + assertNotNull("Incorrect sessionId", session1.getSessionId()); + try { + assertFalse("NETCONF delete-config command failed", + session1.deleteConfig(TargetConfig.RUNNING)); + } catch (NetconfException e) { + e.printStackTrace(); + fail("NETCONF delete-config test failed: " + e.getMessage()); + } + log.info("Finishing delete-config async"); + } + + @Test + public void testCopyConfigRequest() { + log.info("Starting copy-config async"); + assertNotNull("Incorrect sessionId", session1.getSessionId()); + try { + assertTrue("NETCONF copy-config command failed", + session1.copyConfig(TargetConfig.RUNNING.toString(), + "candidate")); + } catch (NetconfException e) { + e.printStackTrace(); + fail("NETCONF edit-config test failed: " + e.getMessage()); + } + log.info("Finishing copy-config async"); + } + + @Test + public void testGetConfigRequest() { + log.info("Starting get-config async"); + assertNotNull("Incorrect sessionId", session1.getSessionId()); + try { + assertTrue("NETCONF get-config running command failed. ", + GET_REPLY_PATTERN.matcher(session1.getConfig(RUNNING, SAMPLE_REQUEST)).matches()); + + assertTrue("NETCONF get-config candidate command failed. ", + GET_REPLY_PATTERN.matcher(session1.getConfig(CANDIDATE, SAMPLE_REQUEST)).matches()); + + } catch (NetconfException e) { + e.printStackTrace(); + fail("NETCONF get-config test failed: " + e.getMessage()); + } + log.info("Finishing get-config async"); + } + + @Test + public void testGetRequest() { + log.info("Starting get async"); + assertNotNull("Incorrect sessionId", session1.getSessionId()); + try { + assertTrue("NETCONF get running command failed. ", + GET_REPLY_PATTERN.matcher(session1.get(SAMPLE_REQUEST, null)).matches()); + + } catch (NetconfException e) { + e.printStackTrace(); + fail("NETCONF get test failed: " + e.getMessage()); + } + log.info("Finishing get async"); + } + + @Test + public void testLockRequest() { + log.info("Starting lock async"); + assertNotNull("Incorrect sessionId", session1.getSessionId()); + try { + assertTrue("NETCONF lock request failed", session1.lock()); + } catch (NetconfException e) { + e.printStackTrace(); + fail("NETCONF lock test failed: " + e.getMessage()); + } + log.info("Finishing lock async"); + } + + @Test + public void testUnLockRequest() { + log.info("Starting unlock async"); + assertNotNull("Incorrect sessionId", session1.getSessionId()); + try { + assertTrue("NETCONF unlock request failed", session1.unlock()); + } catch (NetconfException e) { + e.printStackTrace(); + fail("NETCONF unlock test failed: " + e.getMessage()); + } + log.info("Finishing unlock async"); + } + + + @Test + public void testConcurrentSameSessionAccess() throws InterruptedException { + NCCopyConfigCallable testCopyConfig1 = new NCCopyConfigCallable(session1, RUNNING, "candidate"); + NCCopyConfigCallable testCopyConfig2 = new NCCopyConfigCallable(session1, RUNNING, "startup"); + + FutureTask futureCopyConfig1 = new FutureTask<>(testCopyConfig1); + FutureTask futureCopyConfig2 = new FutureTask<>(testCopyConfig2); + + ExecutorService executor = Executors.newFixedThreadPool(2); + log.info("Starting concurrent execution of copy-config through same session"); + executor.execute(futureCopyConfig1); + executor.execute(futureCopyConfig2); + + int count = 0; + while (count < 10) { + if (futureCopyConfig1.isDone() && futureCopyConfig2.isDone()) { + executor.shutdown(); + log.info("Finished concurrent same session execution"); + return; + } + Thread.sleep(100L); + count++; + } + fail("NETCONF test failed to complete."); + } + + @Test + public void test2SessionAccess() throws InterruptedException { + NCCopyConfigCallable testCopySession1 = new NCCopyConfigCallable(session1, RUNNING, "candidate"); + NCCopyConfigCallable testCopySession2 = new NCCopyConfigCallable(session2, RUNNING, "candidate"); + + FutureTask futureCopySession1 = new FutureTask<>(testCopySession1); + FutureTask futureCopySession2 = new FutureTask<>(testCopySession2); + + ExecutorService executor = Executors.newFixedThreadPool(2); + log.info("Starting concurrent execution of copy-config through 2 different sessions"); + executor.execute(futureCopySession1); + executor.execute(futureCopySession2); + + int count = 0; + while (count < 10) { + if (futureCopySession1.isDone() && futureCopySession2.isDone()) { + executor.shutdown(); + log.info("Finished concurrent 2 session execution"); + return; + } + Thread.sleep(100L); + count++; + } + fail("NETCONF test failed to complete."); + } + + + public static String getTestHelloReply(Optional sessionId) { + return getTestHelloReply(DEFAULT_CAPABILITIES, sessionId); + } + + public static String getTestHelloReply(Collection capabilities, Optional sessionId) { + StringBuffer sb = new StringBuffer(); + + sb.append(""); + sb.append(""); + capabilities.forEach(capability -> { + sb.append("").append(capability).append(""); + }); + sb.append(""); + if (sessionId.isPresent()) { + sb.append(""); + sb.append(sessionId.get().toString()); + sb.append(""); + } + sb.append(""); + + return sb.toString(); + } + + public static String getOkReply(Optional messageId) { + StringBuffer sb = new StringBuffer("\n"); + sb.append(""); + } + sb.append(""); + sb.append(""); + return sb.toString(); + } + + public static String getGetReply(Optional messageId) { + StringBuffer sb = new StringBuffer("\n"); + sb.append(""); + } + sb.append("\n"); + sb.append(SAMPLE_REQUEST); + sb.append("\n"); + sb.append(""); + return sb.toString(); + } + + public static final Pattern HELLO_REQ_PATTERN = + Pattern.compile("(<\\?xml).*" + + "()\\R?" + + "( *)()\\R?" + + "( *)(urn:ietf:params:netconf:base:1.0)\\R?" + + "( *)()\\R?" + + "()\\R? *", + Pattern.DOTALL); + + public static final Pattern EDIT_CONFIG_REQ_PATTERN = + Pattern.compile("(<\\?xml).*" + + "()\\R?" + + "()\\R?" + + "(\\R?((<" + TargetConfig.CANDIDATE.toString() + "/>)|" + + "(<" + TargetConfig.RUNNING.toString() + "/>)|" + + "(<" + TargetConfig.STARTUP.toString() + "/>))\\R?)\\R?" + + "()\\R?" + + ".*" + + "()\\R?()\\R?()\\R?", Pattern.DOTALL); + + + public static final Pattern LOCK_REQ_PATTERN = + Pattern.compile("(<\\?xml).*" + + "()\\R?" + + "()\\R?" + + "(\\R?((<" + TargetConfig.CANDIDATE.toString() + "/>)|" + + "(<" + TargetConfig.RUNNING.toString() + "/>)|" + + "(<" + TargetConfig.STARTUP.toString() + "/>))\\R?)\\R?" + + "()\\R?()\\R?", Pattern.DOTALL); + + public static final Pattern UNLOCK_REQ_PATTERN = + Pattern.compile("(<\\?xml).*" + + "()\\R?" + + "()\\R?" + + "(\\R?((<" + TargetConfig.CANDIDATE.toString() + "/>)|" + + "(<" + TargetConfig.RUNNING.toString() + "/>)|" + + "(<" + TargetConfig.STARTUP.toString() + "/>))\\R?)\\R?" + + "()\\R?()\\R?", Pattern.DOTALL); + + public static final Pattern COPY_CONFIG_REQ_PATTERN = + Pattern.compile("(<\\?xml).*" + + "()\\R?" + + "()\\R?" + + "(\\R?((<" + TargetConfig.CANDIDATE.toString() + "/>)|" + + "(<" + TargetConfig.RUNNING.toString() + "/>)|" + + "(<" + TargetConfig.STARTUP.toString() + "/>))\\R?)\\R?" + + "()\\R?()((" + + TargetConfig.CANDIDATE.toString() + ")|(" + + TargetConfig.RUNNING.toString() + ")|(" + + TargetConfig.STARTUP.toString() + + "))()\\R?()\\R?" + + "()\\R?()\\R?", Pattern.DOTALL); + + public static final Pattern GET_CONFIG_REQ_PATTERN = + Pattern.compile("(<\\?xml).*" + + "()\\R?" + + "()\\R?" + "()\\R?((<" + + TargetConfig.CANDIDATE.toString() + + "/>)|(<" + TargetConfig.RUNNING.toString() + + "/>)|(<" + TargetConfig.STARTUP.toString() + + "/>))\\R?()\\R?" + + "().*()\\R?" + + "()\\R?()\\R?", Pattern.DOTALL); + + public static final Pattern GET_REPLY_PATTERN = + Pattern.compile("(<\\?xml).*" + + "()\\R?" + + "().*()\\R?" + + "()\\R?", Pattern.DOTALL); + + public static final Pattern GET_REQ_PATTERN = + Pattern.compile("(<\\?xml).*" + + "()\\R?" + + "()\\R?" + + "().*()\\R?" + + "()\\R?()\\R?", Pattern.DOTALL); + + public class NCCopyConfigCallable implements Callable { + private NetconfSession session; + private TargetConfig target; + private String source; + + public NCCopyConfigCallable(NetconfSession session, TargetConfig target, String source) { + this.session = session; + this.target = target; + this.source = source; + } + + @Override + public Boolean call() throws Exception { + return session.copyConfig(target, source); + } + } +} diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSshdTestSubsystem.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSshdTestSubsystem.java index af1134c7bd..cc1cbf9596 100644 --- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSshdTestSubsystem.java +++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSshdTestSubsystem.java @@ -22,14 +22,15 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; +import java.nio.Buffer; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.apache.sshd.common.NamedFactory; -import org.apache.sshd.common.util.Buffer; -import org.apache.sshd.common.util.ThreadUtils; +import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.sshd.server.Command; import org.apache.sshd.server.Environment; import org.apache.sshd.server.ExitCallback; @@ -188,11 +189,12 @@ public class NetconfSshdTestSubsystem extends Thread implements Command, Runnabl deviceRequest = deviceRequest.replace(END_PATTERN, ""); Optional messageId = NetconfStreamThread.getMsgId(deviceRequest); log.info("Client Request on session {}. MsgId {}: {}", - session.getId(), messageId, deviceRequest); + session.getSessionId(), messageId, deviceRequest); synchronized (outputStream) { if (NetconfSessionImplTest.HELLO_REQ_PATTERN.matcher(deviceRequest).matches()) { String helloReply = - NetconfSessionImplTest.getTestHelloReply(Optional.of(session.getId())); + NetconfSessionImplTest.getTestHelloReply(Optional.of(ByteBuffer.wrap( + session.getSessionId()).asLongBuffer().get())); outputStream.write(helloReply + END_PATTERN); outputStream.flush(); } else if (NetconfSessionImplTest.EDIT_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches() @@ -211,7 +213,8 @@ public class NetconfSshdTestSubsystem extends Thread implements Command, Runnabl outputStream.flush(); } else { log.error("Unexpected NETCONF message structure on session {} : {}", - session.getId(), deviceRequest); + ByteBuffer.wrap( + session.getSessionId()).asLongBuffer().get(), deviceRequest); } } deviceRequestBuilder.setLength(0); diff --git a/providers/netconf/BUCK b/providers/netconf/BUCK index bb0b23b532..4b077ad766 100644 --- a/providers/netconf/BUCK +++ b/providers/netconf/BUCK @@ -3,6 +3,7 @@ BUNDLES = [ '//providers/netconf/alarm:onos-providers-netconf-alarm', '//protocols/netconf/api:onos-protocols-netconf-api', '//protocols/netconf/ctl:onos-protocols-netconf-ctl', + '//lib:sshd-core', ] EXCLUDED_BUNDLES = [ diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java index 5ddbba099a..41a6ed1029 100644 --- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java +++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java @@ -371,7 +371,7 @@ public class NetconfDeviceProvider extends AbstractProvider private void checkAndUpdateDevice(DeviceId deviceId, DeviceDescription deviceDescription) { Device device = deviceService.getDevice(deviceId); if (device == null) { - log.warn("Device {} has not been added to store, " + + log.debug("Device {} has not been added to store, " + "since it's not reachable", deviceId); } else { boolean isReachable = isReachable(deviceId);