mirror of
https://github.com/opennetworkinglab/onos.git
synced 2026-05-05 04:06:49 +02:00
Post DatabaseManager deprecation code cleanup
- Dropping MutexExecutionService as there are now better alternatives - Dropping New from class names that were added during transition phase Change-Id: If0cdd3321081c3f8fda81441ef2c84549b616edd
This commit is contained in:
parent
d4489882ec
commit
832686da5f
@ -21,9 +21,7 @@ import org.apache.karaf.shell.commands.Command;
|
||||
import org.onosproject.cli.AbstractShellCommand;
|
||||
import org.onosproject.store.service.StorageAdminService;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
/**
|
||||
@ -35,73 +33,17 @@ public class CountersListCommand extends AbstractShellCommand {
|
||||
|
||||
private static final String FMT = "name=%s value=%d";
|
||||
|
||||
/**
|
||||
* Displays counters as text.
|
||||
*
|
||||
* @param counters counter info
|
||||
*/
|
||||
private void displayCounters(Map<String, Long> counters) {
|
||||
counters.forEach((name, value) -> print(FMT, name, value));
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts info for counters into a JSON object.
|
||||
*
|
||||
* @param counters counter info
|
||||
*/
|
||||
private JsonNode json(Map<String, Long> counters) {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
ArrayNode jsonCounters = mapper.createArrayNode();
|
||||
|
||||
// Create a JSON node for each counter
|
||||
counters.forEach((name, value) -> {
|
||||
ObjectNode jsonCounter = mapper.createObjectNode();
|
||||
jsonCounter.put("name", name)
|
||||
.put("value", value);
|
||||
jsonCounters.add(jsonCounter);
|
||||
});
|
||||
|
||||
return jsonCounters;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts info for counters from different databases into a JSON object.
|
||||
*
|
||||
* @param partitionedDbCounters counters info
|
||||
* @param inMemoryDbCounters counters info
|
||||
*/
|
||||
private JsonNode jsonAllCounters(Map<String, Long> partitionedDbCounters,
|
||||
Map<String, Long> inMemoryDbCounters) {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
ArrayNode jsonCounters = mapper.createArrayNode();
|
||||
|
||||
// Create a JSON node for partitioned database counter
|
||||
ObjectNode jsonPartitionedDatabaseCounters = mapper.createObjectNode();
|
||||
jsonPartitionedDatabaseCounters.set("partitionedDatabaseCounters",
|
||||
json(partitionedDbCounters));
|
||||
jsonCounters.add(jsonPartitionedDatabaseCounters);
|
||||
// Create a JSON node for in-memory database counter
|
||||
ObjectNode jsonInMemoryDatabseCounters = mapper.createObjectNode();
|
||||
jsonInMemoryDatabseCounters.set("inMemoryDatabaseCounters",
|
||||
json(inMemoryDbCounters));
|
||||
jsonCounters.add(jsonInMemoryDatabseCounters);
|
||||
|
||||
return jsonCounters;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void execute() {
|
||||
StorageAdminService storageAdminService = get(StorageAdminService.class);
|
||||
Map<String, Long> partitionedDatabaseCounters = storageAdminService.getPartitionedDatabaseCounters();
|
||||
Map<String, Long> inMemoryDatabaseCounters = storageAdminService.getInMemoryDatabaseCounters();
|
||||
Map<String, Long> counters = storageAdminService.getCounters();
|
||||
if (outputJson()) {
|
||||
print("%s", jsonAllCounters(partitionedDatabaseCounters, inMemoryDatabaseCounters));
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
ObjectNode jsonCounters = mapper.createObjectNode();
|
||||
counters.forEach((k, v) -> jsonCounters.put(k, v));
|
||||
print("%s", jsonCounters);
|
||||
} else {
|
||||
print("Partitioned database counters:");
|
||||
displayCounters(partitionedDatabaseCounters);
|
||||
print("In-memory database counters:");
|
||||
displayCounters(inMemoryDatabaseCounters);
|
||||
counters.keySet().stream().sorted().forEach(name -> print(FMT, name, counters.get(name)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,24 +50,6 @@ public interface DistributedQueueBuilder<E> {
|
||||
*/
|
||||
DistributedQueueBuilder<E> withSerializer(Serializer serializer);
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @return this DistributedQueueBuilder for method chaining
|
||||
*/
|
||||
DistributedQueueBuilder<E> withMeteringDisabled();
|
||||
|
||||
|
||||
/**
|
||||
* Disables persistence of queues entries.
|
||||
* <p>
|
||||
* When persistence is disabled, a full cluster restart will wipe out all
|
||||
* queue entries.
|
||||
* </p>
|
||||
* @return this DistributedQueueBuilder for method chaining
|
||||
*/
|
||||
DistributedQueueBuilder<E> withPersistenceDisabled();
|
||||
|
||||
/**
|
||||
* Builds a queue based on the configuration options
|
||||
* supplied to this builder.
|
||||
|
||||
@ -1,34 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.onosproject.store.service;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**
|
||||
* Service for mutually exclusive job execution.
|
||||
*/
|
||||
public interface MutexExecutionService {
|
||||
|
||||
/**
|
||||
* Runs the specified task in a mutually exclusive fashion.
|
||||
* @param task task to run
|
||||
* @param exclusionPath path on which different instances synchronize
|
||||
* @param executor executor to use for running the task
|
||||
* @return future that is completed when the task execution completes.
|
||||
*/
|
||||
CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor);
|
||||
}
|
||||
@ -46,33 +46,11 @@ public interface StorageAdminService {
|
||||
|
||||
/**
|
||||
* Returns information about all the atomic counters in the system.
|
||||
* If 2 counters belonging to 2 different databases have the same name,
|
||||
* then only one counter from one database is returned.
|
||||
*
|
||||
* @return mapping from counter name to that counter's next value
|
||||
* @deprecated 1.5.0 Falcon Release
|
||||
*/
|
||||
@Deprecated
|
||||
Map<String, Long> getCounters();
|
||||
|
||||
/**
|
||||
* Returns information about all the atomic partitioned database counters in the system.
|
||||
*
|
||||
* @return mapping from counter name to that counter's next value
|
||||
* @deprecated 1.5.0 Falcon Release
|
||||
*/
|
||||
@Deprecated
|
||||
Map<String, Long> getPartitionedDatabaseCounters();
|
||||
|
||||
/**
|
||||
* Returns information about all the atomic in-memory database counters in the system.
|
||||
*
|
||||
* @return mapping from counter name to that counter's next value
|
||||
* @deprecated 1.5.0 Falcon Release
|
||||
*/
|
||||
@Deprecated
|
||||
Map<String, Long> getInMemoryDatabaseCounters();
|
||||
|
||||
/**
|
||||
* Returns all pending transactions.
|
||||
*
|
||||
|
||||
@ -73,7 +73,6 @@ import org.onosproject.store.cluster.messaging.MessagingService;
|
||||
import org.onosproject.store.primitives.PartitionAdminService;
|
||||
import org.onosproject.store.primitives.PartitionService;
|
||||
import org.onosproject.store.service.LogicalClockService;
|
||||
import org.onosproject.store.service.MutexExecutionService;
|
||||
import org.onosproject.store.service.StorageAdminService;
|
||||
import org.onosproject.store.service.StorageService;
|
||||
import org.onosproject.ui.UiExtensionService;
|
||||
@ -248,7 +247,6 @@ public final class DefaultPolicyBuilder {
|
||||
permSet.add(new ServicePermission(MessagingService.class.getName(), ServicePermission.GET));
|
||||
permSet.add(new ServicePermission(PartitionService.class.getName(), ServicePermission.GET));
|
||||
permSet.add(new ServicePermission(LogicalClockService.class.getName(), ServicePermission.GET));
|
||||
permSet.add(new ServicePermission(MutexExecutionService.class.getName(), ServicePermission.GET));
|
||||
permSet.add(new ServicePermission(StorageService.class.getName(), ServicePermission.GET));
|
||||
permSet.add(new ServicePermission(UiExtensionService.class.getName(), ServicePermission.GET));
|
||||
|
||||
@ -376,8 +374,6 @@ public final class DefaultPolicyBuilder {
|
||||
PartitionService.class.getName()));
|
||||
serviceDirectory.put(CLOCK_WRITE, ImmutableSet.of(
|
||||
LogicalClockService.class.getName()));
|
||||
serviceDirectory.put(MUTEX_WRITE, ImmutableSet.of(
|
||||
MutexExecutionService.class.getName()));
|
||||
|
||||
return serviceDirectory;
|
||||
}
|
||||
|
||||
@ -34,7 +34,11 @@ import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.onosproject.utils.MeteringAgent;
|
||||
|
||||
|
||||
/**
|
||||
* Default implementation of a {@code AsyncAtomicValue}.
|
||||
*
|
||||
* @param <V> value type
|
||||
*/
|
||||
public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
|
||||
|
||||
private final String name;
|
||||
|
||||
@ -22,11 +22,11 @@ import org.onosproject.store.service.AtomicCounterBuilder;
|
||||
/**
|
||||
* Default implementation of AtomicCounterBuilder.
|
||||
*/
|
||||
public class NewDefaultAtomicCounterBuilder extends AtomicCounterBuilder {
|
||||
public class DefaultAtomicCounterBuilder extends AtomicCounterBuilder {
|
||||
|
||||
private final DistributedPrimitiveCreator primitiveCreator;
|
||||
|
||||
public NewDefaultAtomicCounterBuilder(DistributedPrimitiveCreator primitiveCreator) {
|
||||
public DefaultAtomicCounterBuilder(DistributedPrimitiveCreator primitiveCreator) {
|
||||
this.primitiveCreator = primitiveCreator;
|
||||
}
|
||||
|
||||
@ -26,11 +26,11 @@ import org.onosproject.store.service.ConsistentMapBuilder;
|
||||
* @param <K> type for map key
|
||||
* @param <V> type for map value
|
||||
*/
|
||||
public class NewDefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
|
||||
public class DefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
|
||||
|
||||
private final DistributedPrimitiveCreator primitiveCreator;
|
||||
|
||||
public NewDefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
|
||||
public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
|
||||
this.primitiveCreator = primitiveCreator;
|
||||
}
|
||||
|
||||
@ -28,15 +28,13 @@ import static com.google.common.base.Preconditions.checkState;
|
||||
*
|
||||
* @param <E> queue entry type
|
||||
*/
|
||||
public class NewDefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
|
||||
public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
|
||||
|
||||
private final DistributedPrimitiveCreator primitiveCreator;
|
||||
private String name;
|
||||
private boolean persistenceEnabled = true;
|
||||
private boolean metering = true;
|
||||
private Serializer serializer;
|
||||
|
||||
public NewDefaultDistributedQueueBuilder(DistributedPrimitiveCreator primitiveCreator) {
|
||||
public DefaultDistributedQueueBuilder(DistributedPrimitiveCreator primitiveCreator) {
|
||||
this.primitiveCreator = primitiveCreator;
|
||||
}
|
||||
|
||||
@ -54,18 +52,6 @@ public class NewDefaultDistributedQueueBuilder<E> implements DistributedQueueBui
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistributedQueueBuilder<E> withMeteringDisabled() {
|
||||
metering = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistributedQueueBuilder<E> withPersistenceDisabled() {
|
||||
persistenceEnabled = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
private boolean validInputs() {
|
||||
return name != null && serializer != null;
|
||||
}
|
||||
@ -32,7 +32,7 @@ import com.google.common.collect.Sets;
|
||||
/**
|
||||
* Default implementation of transaction context.
|
||||
*/
|
||||
public class NewDefaultTransactionContext implements TransactionContext {
|
||||
public class DefaultTransactionContext implements TransactionContext {
|
||||
|
||||
private final AtomicBoolean isOpen = new AtomicBoolean(false);
|
||||
private final DistributedPrimitiveCreator creator;
|
||||
@ -41,7 +41,7 @@ public class NewDefaultTransactionContext implements TransactionContext {
|
||||
private final Set<TransactionParticipant> txParticipants = Sets.newConcurrentHashSet();
|
||||
private final MeteringAgent monitor;
|
||||
|
||||
public NewDefaultTransactionContext(TransactionId transactionId,
|
||||
public DefaultTransactionContext(TransactionId transactionId,
|
||||
DistributedPrimitiveCreator creator,
|
||||
TransactionCoordinator transactionCoordinator) {
|
||||
this.transactionId = transactionId;
|
||||
@ -22,13 +22,13 @@ import org.onosproject.store.service.TransactionContextBuilder;
|
||||
/**
|
||||
* Default Transaction Context Builder.
|
||||
*/
|
||||
public class NewDefaultTransactionContextBuilder extends TransactionContextBuilder {
|
||||
public class DefaultTransactionContextBuilder extends TransactionContextBuilder {
|
||||
|
||||
private final TransactionId transactionId;
|
||||
private final DistributedPrimitiveCreator primitiveCreator;
|
||||
private final TransactionCoordinator transactionCoordinator;
|
||||
|
||||
public NewDefaultTransactionContextBuilder(TransactionId transactionId,
|
||||
public DefaultTransactionContextBuilder(TransactionId transactionId,
|
||||
DistributedPrimitiveCreator primitiveCreator,
|
||||
TransactionCoordinator transactionCoordinator) {
|
||||
this.transactionId = transactionId;
|
||||
@ -38,7 +38,7 @@ public class NewDefaultTransactionContextBuilder extends TransactionContextBuild
|
||||
|
||||
@Override
|
||||
public TransactionContext build() {
|
||||
return new NewDefaultTransactionContext(transactionId,
|
||||
return new DefaultTransactionContext(transactionId,
|
||||
primitiveCreator,
|
||||
transactionCoordinator);
|
||||
}
|
||||
@ -1,317 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Open Networking Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.onosproject.store.primitives.impl;
|
||||
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
import org.apache.felix.scr.annotations.Deactivate;
|
||||
import org.apache.felix.scr.annotations.Reference;
|
||||
import org.apache.felix.scr.annotations.ReferenceCardinality;
|
||||
import org.apache.felix.scr.annotations.Service;
|
||||
import org.onlab.util.Tools;
|
||||
import org.onosproject.cluster.ClusterEvent;
|
||||
import org.onosproject.cluster.ClusterEventListener;
|
||||
import org.onosproject.cluster.ClusterService;
|
||||
import org.onosproject.cluster.ControllerNode.State;
|
||||
import org.onosproject.cluster.NodeId;
|
||||
import org.onosproject.store.serializers.KryoNamespaces;
|
||||
import org.onosproject.store.service.ConsistentMap;
|
||||
import org.onosproject.store.service.ConsistentMapException;
|
||||
import org.onosproject.store.service.MapEvent;
|
||||
import org.onosproject.store.service.MapEventListener;
|
||||
import org.onosproject.store.service.MutexExecutionService;
|
||||
import org.onosproject.store.service.MutexTask;
|
||||
import org.onosproject.store.service.Serializer;
|
||||
import org.onosproject.store.service.StorageService;
|
||||
import org.onosproject.store.service.Versioned;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import static org.onosproject.security.AppGuard.checkPermission;
|
||||
import static org.onosproject.security.AppPermission.Type.MUTEX_WRITE;
|
||||
/**
|
||||
* Implementation of a MutexExecutionService.
|
||||
*/
|
||||
@Component(immediate = true)
|
||||
@Service
|
||||
public class MutexExecutionManager implements MutexExecutionService {
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
protected ConsistentMap<String, MutexState> lockMap;
|
||||
protected NodeId localNodeId;
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||
protected ClusterService clusterService;
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||
protected StorageService storageService;
|
||||
|
||||
private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener();
|
||||
private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
|
||||
|
||||
private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap();
|
||||
private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap();
|
||||
|
||||
@Activate
|
||||
public void activate() {
|
||||
localNodeId = clusterService.getLocalNode().id();
|
||||
lockMap = storageService.<String, MutexState>consistentMapBuilder()
|
||||
.withName("onos-mutexes")
|
||||
.withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class))
|
||||
.withPartitionsDisabled()
|
||||
.build();
|
||||
lockMap.addListener(mapEventListener);
|
||||
clusterService.addListener(clusterEventListener);
|
||||
releaseOldLocks();
|
||||
log.info("Started");
|
||||
}
|
||||
|
||||
@Deactivate
|
||||
public void deactivate() {
|
||||
lockMap.removeListener(mapEventListener);
|
||||
pending.values().forEach(future -> future.cancel(true));
|
||||
activeTasks.forEach((k, v) -> {
|
||||
v.stop();
|
||||
unlock(k);
|
||||
});
|
||||
clusterService.removeListener(clusterEventListener);
|
||||
log.info("Stopped");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) {
|
||||
checkPermission(MUTEX_WRITE);
|
||||
return lock(exclusionPath)
|
||||
.thenApply(state -> activeTasks.computeIfAbsent(exclusionPath,
|
||||
k -> new InnerMutexTask(exclusionPath,
|
||||
task,
|
||||
state.term())))
|
||||
.thenAcceptAsync(t -> t.start(), executor)
|
||||
.whenComplete((r, e) -> unlock(exclusionPath));
|
||||
}
|
||||
|
||||
protected CompletableFuture<MutexState> lock(String exclusionPath) {
|
||||
CompletableFuture<MutexState> future =
|
||||
pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>());
|
||||
tryLock(exclusionPath);
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to acquire lock for a path. If lock is held by some other node, adds this node to
|
||||
* the wait list.
|
||||
* @param exclusionPath exclusion path
|
||||
*/
|
||||
protected void tryLock(String exclusionPath) {
|
||||
Tools.retryable(() -> lockMap.asJavaMap()
|
||||
.compute(exclusionPath,
|
||||
(k, v) -> MutexState.admit(v, localNodeId)),
|
||||
ConsistentMapException.ConcurrentModification.class,
|
||||
Integer.MAX_VALUE,
|
||||
100).get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases lock for the specific path. This operation is idempotent.
|
||||
* @param exclusionPath exclusion path
|
||||
*/
|
||||
protected void unlock(String exclusionPath) {
|
||||
Tools.retryable(() -> lockMap.asJavaMap()
|
||||
.compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)),
|
||||
ConsistentMapException.ConcurrentModification.class,
|
||||
Integer.MAX_VALUE,
|
||||
100).get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Detects and releases all locks held by this node.
|
||||
*/
|
||||
private void releaseOldLocks() {
|
||||
Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder()))
|
||||
.keySet()
|
||||
.forEach(path -> {
|
||||
log.info("Detected zombie task still holding lock for {}. Releasing lock.", path);
|
||||
unlock(path);
|
||||
});
|
||||
}
|
||||
|
||||
private class InternalLockMapEventListener implements MapEventListener<String, MutexState> {
|
||||
|
||||
@Override
|
||||
public void event(MapEvent<String, MutexState> event) {
|
||||
log.debug("Received {}", event);
|
||||
if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) {
|
||||
pending.computeIfPresent(event.key(), (k, future) -> {
|
||||
MutexState state = Versioned.valueOrElse(event.value(), null);
|
||||
if (state != null && localNodeId.equals(state.holder())) {
|
||||
log.debug("Local node is now owner for {}", event.key());
|
||||
future.complete(state);
|
||||
return null;
|
||||
} else {
|
||||
return future;
|
||||
}
|
||||
});
|
||||
InnerMutexTask task = activeTasks.get(event.key());
|
||||
if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) {
|
||||
task.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class InternalClusterEventListener implements ClusterEventListener {
|
||||
|
||||
@Override
|
||||
public void event(ClusterEvent event) {
|
||||
if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED ||
|
||||
event.type() == ClusterEvent.Type.INSTANCE_REMOVED) {
|
||||
NodeId nodeId = event.subject().id();
|
||||
log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId);
|
||||
lockMap.asJavaMap().forEach((k, v) -> {
|
||||
if (v.contains(nodeId)) {
|
||||
lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId));
|
||||
}
|
||||
});
|
||||
}
|
||||
long activeNodes = clusterService.getNodes()
|
||||
.stream()
|
||||
.map(node -> clusterService.getState(node.id()))
|
||||
.filter(State::isActive)
|
||||
.count();
|
||||
if (clusterService.getNodes().size() > 1 && activeNodes == 1) {
|
||||
log.info("This node is partitioned away from the cluster. Stopping all inflight executions");
|
||||
activeTasks.forEach((k, v) -> {
|
||||
v.stop();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class MutexState {
|
||||
|
||||
private final NodeId holder;
|
||||
private final List<NodeId> waitList;
|
||||
private final long term;
|
||||
|
||||
public static MutexState admit(MutexState state, NodeId nodeId) {
|
||||
if (state == null) {
|
||||
return new MutexState(nodeId, 1L, Lists.newArrayList());
|
||||
} else if (state.holder() == null) {
|
||||
return new MutexState(nodeId, state.term() + 1, Lists.newArrayList());
|
||||
} else {
|
||||
if (!state.contains(nodeId)) {
|
||||
NodeId newHolder = state.holder();
|
||||
List<NodeId> newWaitList = Lists.newArrayList(state.waitList());
|
||||
newWaitList.add(nodeId);
|
||||
return new MutexState(newHolder, state.term(), newWaitList);
|
||||
} else {
|
||||
return state;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static MutexState evict(MutexState state, NodeId nodeId) {
|
||||
return state.evict(nodeId);
|
||||
}
|
||||
|
||||
public MutexState evict(NodeId nodeId) {
|
||||
if (nodeId.equals(holder)) {
|
||||
if (waitList.isEmpty()) {
|
||||
return new MutexState(null, term, waitList);
|
||||
}
|
||||
List<NodeId> newWaitList = Lists.newArrayList(waitList);
|
||||
NodeId newHolder = newWaitList.remove(0);
|
||||
return new MutexState(newHolder, term + 1, newWaitList);
|
||||
} else {
|
||||
NodeId newHolder = holder;
|
||||
List<NodeId> newWaitList = Lists.newArrayList(waitList);
|
||||
newWaitList.remove(nodeId);
|
||||
return new MutexState(newHolder, term, newWaitList);
|
||||
}
|
||||
}
|
||||
|
||||
public NodeId holder() {
|
||||
return holder;
|
||||
}
|
||||
|
||||
public List<NodeId> waitList() {
|
||||
return waitList;
|
||||
}
|
||||
|
||||
public long term() {
|
||||
return term;
|
||||
}
|
||||
|
||||
private boolean contains(NodeId nodeId) {
|
||||
return (nodeId.equals(holder) || waitList.contains(nodeId));
|
||||
}
|
||||
|
||||
private MutexState(NodeId holder, long term, List<NodeId> waitList) {
|
||||
this.holder = holder;
|
||||
this.term = term;
|
||||
this.waitList = Lists.newArrayList(waitList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(getClass())
|
||||
.add("holder", holder)
|
||||
.add("term", term)
|
||||
.add("waitList", waitList)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private class InnerMutexTask implements MutexTask {
|
||||
private final MutexTask task;
|
||||
private final String mutexPath;
|
||||
private final long term;
|
||||
|
||||
public InnerMutexTask(String mutexPath, MutexTask task, long term) {
|
||||
this.mutexPath = mutexPath;
|
||||
this.term = term;
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
public long term() {
|
||||
return term;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
log.debug("Starting execution for mutex task guarded by {}", mutexPath);
|
||||
task.start();
|
||||
log.debug("Finished execution for mutex task guarded by {}", mutexPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
log.debug("Stopping execution for mutex task guarded by {}", mutexPath);
|
||||
task.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -94,7 +94,7 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
|
||||
messagingService,
|
||||
clusterService,
|
||||
CatalystSerializers.getSerializer(),
|
||||
new File(System.getProperty("karaf.data") + "/data/" + partition.getId()))));
|
||||
new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
|
||||
|
||||
CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
|
||||
.stream()
|
||||
|
||||
@ -57,7 +57,6 @@ import org.onosproject.store.service.StorageService;
|
||||
import org.onosproject.store.service.TransactionContextBuilder;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
|
||||
@ -129,7 +128,7 @@ public class StorageManager implements StorageService, StorageAdminService {
|
||||
@Override
|
||||
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
|
||||
checkPermission(STORAGE_WRITE);
|
||||
return new NewDefaultConsistentMapBuilder<>(federatedPrimitiveCreator);
|
||||
return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -141,13 +140,13 @@ public class StorageManager implements StorageService, StorageAdminService {
|
||||
@Override
|
||||
public <E> DistributedQueueBuilder<E> queueBuilder() {
|
||||
checkPermission(STORAGE_WRITE);
|
||||
return new NewDefaultDistributedQueueBuilder<>(federatedPrimitiveCreator);
|
||||
return new DefaultDistributedQueueBuilder<>(federatedPrimitiveCreator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AtomicCounterBuilder atomicCounterBuilder() {
|
||||
checkPermission(STORAGE_WRITE);
|
||||
return new NewDefaultAtomicCounterBuilder(federatedPrimitiveCreator);
|
||||
return new DefaultAtomicCounterBuilder(federatedPrimitiveCreator);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -163,7 +162,7 @@ public class StorageManager implements StorageService, StorageAdminService {
|
||||
@Override
|
||||
public TransactionContextBuilder transactionContextBuilder() {
|
||||
checkPermission(STORAGE_WRITE);
|
||||
return new NewDefaultTransactionContextBuilder(transactionIdGenerator.get(),
|
||||
return new DefaultTransactionContextBuilder(transactionIdGenerator.get(),
|
||||
federatedPrimitiveCreator,
|
||||
transactionCoordinator);
|
||||
}
|
||||
@ -181,26 +180,10 @@ public class StorageManager implements StorageService, StorageAdminService {
|
||||
|
||||
@Override
|
||||
public Map<String, Long> getCounters() {
|
||||
Map<String, Long> result = Maps.newHashMap();
|
||||
result.putAll(getInMemoryDatabaseCounters());
|
||||
result.putAll(getPartitionedDatabaseCounters());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Long> getInMemoryDatabaseCounters() {
|
||||
return ImmutableMap.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Long> getPartitionedDatabaseCounters() {
|
||||
return getCounters(federatedPrimitiveCreator);
|
||||
}
|
||||
|
||||
public Map<String, Long> getCounters(DistributedPrimitiveCreator creator) {
|
||||
Map<String, Long> counters = Maps.newConcurrentMap();
|
||||
creator.getAsyncAtomicCounterNames()
|
||||
.forEach(name -> counters.put(name, creator.newAsyncCounter(name).asAtomicCounter().get()));
|
||||
federatedPrimitiveCreator.getAsyncAtomicCounterNames()
|
||||
.forEach(name -> counters.put(name,
|
||||
federatedPrimitiveCreator.newAsyncCounter(name).asAtomicCounter().get()));
|
||||
return counters;
|
||||
}
|
||||
|
||||
|
||||
@ -35,6 +35,8 @@ import org.onosproject.store.service.Versioned;
|
||||
*/
|
||||
public class UnmodifiableAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMap<K, V> {
|
||||
|
||||
private static final String ERROR_MSG = "map updates are not allowed";
|
||||
|
||||
public UnmodifiableAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap) {
|
||||
super(backingMap);
|
||||
}
|
||||
@ -43,56 +45,56 @@ public class UnmodifiableAsyncConsistentMap<K, V> extends DelegatingAsyncConsist
|
||||
public CompletableFuture<Versioned<V>> computeIf(K key,
|
||||
Predicate<? super V> condition,
|
||||
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(""));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Versioned<V>> put(K key, V value) {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Versioned<V>> remove(K key) {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> clear() {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> remove(K key, V value) {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> remove(K key, long version) {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Versioned<V>> replace(K key, V value) {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
|
||||
return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG));
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,7 +15,6 @@
|
||||
*/
|
||||
|
||||
/**
|
||||
* Implementation of partitioned and distributed store facility capable of
|
||||
* providing consistent update semantics.
|
||||
* Implementation classes for various Distributed primitives.
|
||||
*/
|
||||
package org.onosproject.store.primitives.impl;
|
||||
Loading…
x
Reference in New Issue
Block a user