Enabling role timeout configuration on the Device Manager

Change-Id: Ie8664189d236431a5f2ee42476d0f99fdfe5f1d7
(cherry picked from commit 32762fa3a3fce3f0cc6d97e9fadca2959080ad08)
This commit is contained in:
Andrea Campanella 2022-06-29 16:24:00 +02:00
parent fc25e87c54
commit a42a6ab206
4 changed files with 82 additions and 8 deletions

View File

@ -22,6 +22,7 @@ import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import org.onlab.util.KryoNamespace; import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools; import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId; import org.onosproject.cluster.NodeId;
@ -70,21 +71,25 @@ import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer; import org.onosproject.store.service.Serializer;
import org.onosproject.upgrade.UpgradeService; import org.onosproject.upgrade.UpgradeService;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference; import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.ReferenceCardinality; import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.time.Instant; import java.time.Instant;
import java.util.Collection; import java.util.Collection;
import java.util.Dictionary;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.Properties;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -97,15 +102,19 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.Multimaps.newListMultimap; import static com.google.common.collect.Multimaps.newListMultimap;
import static com.google.common.collect.Multimaps.synchronizedListMultimap; import static com.google.common.collect.Multimaps.synchronizedListMultimap;
import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.lang.System.currentTimeMillis; import static java.lang.System.currentTimeMillis;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.MastershipRole.MASTER; import static org.onosproject.net.MastershipRole.MASTER;
import static org.onosproject.net.MastershipRole.NONE; import static org.onosproject.net.MastershipRole.NONE;
import static org.onosproject.net.MastershipRole.STANDBY; import static org.onosproject.net.MastershipRole.STANDBY;
import static org.onosproject.net.device.impl.OsgiPropertyConstants.ROLE_TIMEOUT_SECONDS;
import static org.onosproject.net.device.impl.OsgiPropertyConstants.ROLE_TIMEOUT_SECONDS_DEFAULT;
import static org.onosproject.security.AppGuard.checkPermission; import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.DEVICE_READ; import static org.onosproject.security.AppPermission.Type.DEVICE_READ;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@ -115,7 +124,10 @@ import static org.slf4j.LoggerFactory.getLogger;
*/ */
@Component(immediate = true, @Component(immediate = true,
service = {DeviceService.class, DeviceAdminService.class, service = {DeviceService.class, DeviceAdminService.class,
DeviceProviderRegistry.class, PortConfigOperatorRegistry.class }) DeviceProviderRegistry.class, PortConfigOperatorRegistry.class },
property = {
ROLE_TIMEOUT_SECONDS + ":Integer=" + ROLE_TIMEOUT_SECONDS_DEFAULT
})
public class DeviceManager public class DeviceManager
extends AbstractListenerProviderRegistry<DeviceEvent, DeviceListener, DeviceProvider, DeviceProviderService> extends AbstractListenerProviderRegistry<DeviceEvent, DeviceListener, DeviceProvider, DeviceProviderService>
implements DeviceService, DeviceAdminService, DeviceProviderRegistry, PortConfigOperatorRegistry { implements DeviceService, DeviceAdminService, DeviceProviderRegistry, PortConfigOperatorRegistry {
@ -162,6 +174,9 @@ public class DeviceManager
@Reference(cardinality = ReferenceCardinality.MANDATORY) @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterCommunicationService communicationService; protected ClusterCommunicationService communicationService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService cfgService;
private ExecutorService clusterRequestExecutor; private ExecutorService clusterRequestExecutor;
/** /**
* List of all registered PortConfigOperator. * List of all registered PortConfigOperator.
@ -214,14 +229,22 @@ public class DeviceManager
private final Map<DeviceId, Long> roleToAcknowledge = private final Map<DeviceId, Long> roleToAcknowledge =
Maps.newConcurrentMap(); Maps.newConcurrentMap();
private ScheduledExecutorService backgroundRoleChecker; private ScheduledExecutorService backgroundRoleChecker;
private static final int ROLE_TIMEOUT_SECONDS = 10;
/**
* Timeout for role acknowledgement check.
**/
protected int roleTimeoutSeconds = ROLE_TIMEOUT_SECONDS_DEFAULT;
// FIXME join this map with roleToAcknowledge and fix the back to back event issue here // FIXME join this map with roleToAcknowledge and fix the back to back event issue here
private final Map<DeviceId, MastershipRole> lastAcknowledgedRole = private final Map<DeviceId, MastershipRole> lastAcknowledgedRole =
Maps.newConcurrentMap(); Maps.newConcurrentMap();
@Activate @Activate
public void activate() { public void activate(ComponentContext context) {
cfgService.registerProperties(getClass());
modified(context);
portAnnotationOp = new PortAnnotationOperator(networkConfigService); portAnnotationOp = new PortAnnotationOperator(networkConfigService);
deviceAnnotationOp = new DeviceAnnotationOperator(networkConfigService); deviceAnnotationOp = new DeviceAnnotationOperator(networkConfigService);
portOpsIndex.put(PortAnnotationConfig.class, portAnnotationOp); portOpsIndex.put(PortAnnotationConfig.class, portAnnotationOp);
@ -271,8 +294,25 @@ public class DeviceManager
log.info("Started"); log.info("Started");
} }
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
String roleTimeoutSec = get(properties, ROLE_TIMEOUT_SECONDS);
int oldRoleTimeoutSeconds = roleTimeoutSeconds;
try {
roleTimeoutSeconds = isNullOrEmpty(roleTimeoutSec) ?
oldRoleTimeoutSeconds : Integer.parseInt(roleTimeoutSec.trim());
} catch (NumberFormatException e) {
log.warn("Can't parse {}, setting the old value {}", roleTimeoutSec, oldRoleTimeoutSeconds, e);
roleTimeoutSeconds = oldRoleTimeoutSeconds;
}
log.info("Modified. Values = {}: {}",
ROLE_TIMEOUT_SECONDS, roleTimeoutSeconds);
}
@Deactivate @Deactivate
public void deactivate() { public void deactivate(ComponentContext context) {
cfgService.unregisterProperties(getClass(), true);
backgroundService.shutdown(); backgroundService.shutdown();
networkConfigService.removeListener(networkConfigListener); networkConfigService.removeListener(networkConfigListener);
store.unsetDelegate(delegate); store.unsetDelegate(delegate);
@ -613,7 +653,7 @@ public class DeviceManager
return null; return null;
} }
exists.set(true); exists.set(true);
if (currentTimeMillis() - value < (ROLE_TIMEOUT_SECONDS * 1000)) { if (currentTimeMillis() - value < (roleTimeoutSeconds * 1000)) {
return value; return value;
} }
return null; return null;
@ -625,7 +665,7 @@ public class DeviceManager
} }
// Timeout still on // Timeout still on
if (ts != null) { if (ts != null) {
log.debug("Timeout expires in {} ms", ((ROLE_TIMEOUT_SECONDS * 1000) - currentTimeMillis() + ts)); log.debug("Timeout expires in {} ms", ((roleTimeoutSeconds * 1000) - currentTimeMillis() + ts));
continue; continue;
} }
if (myRole != MASTER) { if (myRole != MASTER) {

View File

@ -0,0 +1,28 @@
/*
* Copyright 2022-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.device.impl;
/**
* Constants for default values of configurable properties.
*/
public final class OsgiPropertyConstants {
private OsgiPropertyConstants() {
}
public static final String ROLE_TIMEOUT_SECONDS = "roleTimeoutSeconds";
public static final int ROLE_TIMEOUT_SECONDS_DEFAULT = 10;
}

View File

@ -19,8 +19,10 @@ import com.google.common.collect.Sets;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.onlab.osgi.ComponentContextAdapter;
import org.onlab.packet.ChassisId; import org.onlab.packet.ChassisId;
import org.onlab.packet.IpAddress; import org.onlab.packet.IpAddress;
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.cluster.ClusterServiceAdapter; import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode; import org.onosproject.cluster.DefaultControllerNode;
@ -100,6 +102,7 @@ public class DeviceManagerTest {
service = mgr; service = mgr;
admin = mgr; admin = mgr;
registry = mgr; registry = mgr;
mgr.cfgService = new ComponentConfigAdapter();
mgr.store = new SimpleDeviceStore(); mgr.store = new SimpleDeviceStore();
injectEventDispatcher(mgr, new TestEventDispatcher()); injectEventDispatcher(mgr, new TestEventDispatcher());
TestMastershipManager mastershipManager = new TestMastershipManager(); TestMastershipManager mastershipManager = new TestMastershipManager();
@ -108,7 +111,7 @@ public class DeviceManagerTest {
mgr.clusterService = new TestClusterService(); mgr.clusterService = new TestClusterService();
mgr.networkConfigService = new TestNetworkConfigService(); mgr.networkConfigService = new TestNetworkConfigService();
mgr.communicationService = new TestClusterCommunicationService(); mgr.communicationService = new TestClusterCommunicationService();
mgr.activate(); mgr.activate(new ComponentContextAdapter());
service.addListener(listener); service.addListener(listener);
@ -125,7 +128,7 @@ public class DeviceManagerTest {
assertFalse("provider should not be registered", assertFalse("provider should not be registered",
registry.getProviders().contains(provider.id())); registry.getProviders().contains(provider.id()));
service.removeListener(listener); service.removeListener(listener);
mgr.deactivate(); mgr.deactivate(new ComponentContextAdapter());
} }
private void connectDevice(DeviceId deviceId, String swVersion) { private void connectDevice(DeviceId deviceId, String swVersion) {

View File

@ -244,6 +244,9 @@ class OFChannelHandler extends ChannelInboundHandlerAdapter
* Executor is instantiated as a single thread executor guaranteeing processing * Executor is instantiated as a single thread executor guaranteeing processing
* of device status messages in order. * of device status messages in order.
*/ */
// TODO With a huge number of ports per device congestion can be created at the runtimeExecutor,
// leading to mastership roles check going into timeout, with issues in multi-instance mastership handling.
// An option is to Experiment with priority tasks - to give priority (under certain conditions) to the ROLE_REPLY
protected ExecutorService runtimeStatusExecutor; protected ExecutorService runtimeStatusExecutor;
/** /**