When a Copycat client is in SUSPENDED or CLOSED state fail-fast all its operations

Change-Id: I821ca0a488e68d004b4e41b6d8ac28368f09ffcb
This commit is contained in:
Madan Jampani 2016-06-06 17:15:25 -07:00
parent a4d2c72a05
commit d5b200f5f9
4 changed files with 74 additions and 35 deletions

View File

@ -32,6 +32,12 @@ public class StorageException extends RuntimeException {
super(t); super(t);
} }
/**
* Store is temporarily unavailable.
*/
public static class Unavailable extends StorageException {
}
/** /**
* Store operation timeout. * Store operation timeout.
*/ */

View File

@ -51,6 +51,7 @@ import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.DistributedQueue; import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.PartitionClientInfo; import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.Serializer; import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageException;
import org.slf4j.Logger; import org.slf4j.Logger;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
@ -119,6 +120,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override @Override
public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) { public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
checkAvailability();
AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join(); AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
Consumer<State> statusListener = state -> { Consumer<State> statusListener = state -> {
atomixConsistentMap.statusChangeListeners() atomixConsistentMap.statusChangeListeners()
@ -143,11 +145,13 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override @Override
public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) { public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
checkAvailability();
return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer)); return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer));
} }
@Override @Override
public AsyncAtomicCounter newAsyncCounter(String name) { public AsyncAtomicCounter newAsyncCounter(String name) {
checkAvailability();
DistributedLong distributedLong = client.getLong(name).join(); DistributedLong distributedLong = client.getLong(name).join();
return new AtomixCounter(name, distributedLong); return new AtomixCounter(name, distributedLong);
} }
@ -165,6 +169,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override @Override
public AsyncLeaderElector newAsyncLeaderElector(String name) { public AsyncLeaderElector newAsyncLeaderElector(String name) {
checkAvailability();
AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class) AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
.thenCompose(AtomixLeaderElector::setupCache) .thenCompose(AtomixLeaderElector::setupCache)
.join(); .join();
@ -178,11 +183,13 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override @Override
public Set<String> getAsyncConsistentMapNames() { public Set<String> getAsyncConsistentMapNames() {
checkAvailability();
return client.keys(AtomixConsistentMap.class).join(); return client.keys(AtomixConsistentMap.class).join();
} }
@Override @Override
public Set<String> getAsyncAtomicCounterNames() { public Set<String> getAsyncAtomicCounterNames() {
checkAvailability();
return client.keys(DistributedLong.class).join(); return client.keys(DistributedLong.class).join();
} }
@ -227,4 +234,10 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
} }
return new ResourceClient(new QueryRetryingCopycatClient(copycatClient, 2, 100)); return new ResourceClient(new QueryRetryingCopycatClient(copycatClient, 2, 100));
} }
private void checkAvailability() {
if (resourceClient.client().state() == State.SUSPENDED || resourceClient.client().state() == State.CLOSED) {
throw new StorageException.Unavailable();
}
}
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.onosproject.store.primitives.resources.impl; package org.onosproject.store.primitives.resources.impl;
import io.atomix.copycat.Operation;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource; import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo; import io.atomix.resource.ResourceTypeInfo;
@ -34,6 +35,7 @@ import java.util.function.Consumer;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.onlab.util.Match; import org.onlab.util.Match;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.TransactionId; import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear; import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey; import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
@ -55,6 +57,7 @@ import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent; import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener; import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction; import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.Versioned; import org.onosproject.store.service.Versioned;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
@ -97,48 +100,48 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override @Override
public CompletableFuture<Boolean> isEmpty() { public CompletableFuture<Boolean> isEmpty() {
return client.submit(new IsEmpty()); return submit(new IsEmpty());
} }
@Override @Override
public CompletableFuture<Integer> size() { public CompletableFuture<Integer> size() {
return client.submit(new Size()); return submit(new Size());
} }
@Override @Override
public CompletableFuture<Boolean> containsKey(String key) { public CompletableFuture<Boolean> containsKey(String key) {
return client.submit(new ContainsKey(key)); return submit(new ContainsKey(key));
} }
@Override @Override
public CompletableFuture<Boolean> containsValue(byte[] value) { public CompletableFuture<Boolean> containsValue(byte[] value) {
return client.submit(new ContainsValue(value)); return submit(new ContainsValue(value));
} }
@Override @Override
public CompletableFuture<Versioned<byte[]>> get(String key) { public CompletableFuture<Versioned<byte[]>> get(String key) {
return client.submit(new Get(key)); return submit(new Get(key));
} }
@Override @Override
public CompletableFuture<Set<String>> keySet() { public CompletableFuture<Set<String>> keySet() {
return client.submit(new KeySet()); return submit(new KeySet());
} }
@Override @Override
public CompletableFuture<Collection<Versioned<byte[]>>> values() { public CompletableFuture<Collection<Versioned<byte[]>>> values() {
return client.submit(new Values()); return submit(new Values());
} }
@Override @Override
public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() { public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
return client.submit(new EntrySet()); return submit(new EntrySet());
} }
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) { public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY)) return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status())) .whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue()); .thenApply(v -> v.oldValue());
} }
@ -146,7 +149,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) { public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY)) return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status())) .whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.newValue()); .thenApply(v -> v.newValue());
} }
@ -154,14 +157,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) { public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY)) return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status())) .whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue()); .thenApply(v -> v.oldValue());
} }
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> remove(String key) { public CompletableFuture<Versioned<byte[]>> remove(String key) {
return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY)) return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status())) .whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue()); .thenApply(v -> v.oldValue());
} }
@ -169,7 +172,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, byte[] value) { public CompletableFuture<Boolean> remove(String key, byte[] value) {
return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY)) return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status())) .whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated()); .thenApply(v -> v.updated());
} }
@ -177,7 +180,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, long version) { public CompletableFuture<Boolean> remove(String key, long version) {
return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version))) return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
.whenComplete((r, e) -> throwIfLocked(r.status())) .whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated()); .thenApply(v -> v.updated());
} }
@ -185,7 +188,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) { public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY)) return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status())) .whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue()); .thenApply(v -> v.oldValue());
} }
@ -193,7 +196,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) { public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY)) return submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status())) .whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated()); .thenApply(v -> v.updated());
} }
@ -201,14 +204,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) { public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion))) return submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
.whenComplete((r, e) -> throwIfLocked(r.status())) .whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated()); .thenApply(v -> v.updated());
} }
@Override @Override
public CompletableFuture<Void> clear() { public CompletableFuture<Void> clear() {
return client.submit(new Clear()) return submit(new Clear())
.whenComplete((r, e) -> throwIfLocked(r)) .whenComplete((r, e) -> throwIfLocked(r))
.thenApply(v -> null); .thenApply(v -> null);
} }
@ -239,7 +242,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
} }
Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY; Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version()); Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
return client.submit(new UpdateAndGet(key, return submit(new UpdateAndGet(key,
computedValue.get(), computedValue.get(),
valueMatch, valueMatch,
versionMatch)) versionMatch))
@ -252,7 +255,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener, public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
Executor executor) { Executor executor) {
if (mapEventListeners.isEmpty()) { if (mapEventListeners.isEmpty()) {
return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor)); return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
} else { } else {
mapEventListeners.put(listener, executor); mapEventListeners.put(listener, executor);
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
@ -262,7 +265,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override @Override
public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) { public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) { if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
return client.submit(new Unlisten()).thenApply(v -> null); return submit(new Unlisten()).thenApply(v -> null);
} }
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
@ -275,23 +278,23 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override @Override
public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) { public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK); return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
} }
@Override @Override
public CompletableFuture<Void> commit(TransactionId transactionId) { public CompletableFuture<Void> commit(TransactionId transactionId) {
return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null); return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
} }
@Override @Override
public CompletableFuture<Void> rollback(TransactionId transactionId) { public CompletableFuture<Void> rollback(TransactionId transactionId) {
return client.submit(new TransactionRollback(transactionId)) return submit(new TransactionRollback(transactionId))
.thenApply(v -> null); .thenApply(v -> null);
} }
@Override @Override
public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) { public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK); return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
} }
@Override @Override
@ -308,4 +311,11 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
public Collection<Consumer<Status>> statusChangeListeners() { public Collection<Consumer<Status>> statusChangeListeners() {
return ImmutableSet.copyOf(statusChangeListeners); return ImmutableSet.copyOf(statusChangeListeners);
} }
<T> CompletableFuture<T> submit(Operation<T> command) {
if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
return Tools.exceptionalFuture(new StorageException.Unavailable());
}
return client.submit(command);
}
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.onosproject.store.primitives.resources.impl; package org.onosproject.store.primitives.resources.impl;
import io.atomix.copycat.Operation;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource; import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo; import io.atomix.resource.ResourceTypeInfo;
@ -27,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.onlab.util.Tools;
import org.onosproject.cluster.Leadership; import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId; import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change; import org.onosproject.event.Change;
@ -40,6 +42,7 @@ import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorComman
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten; import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw; import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
import org.onosproject.store.service.AsyncLeaderElector; import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.StorageException;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
@ -67,7 +70,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
super(client, properties); super(client, properties);
cache = CacheBuilder.newBuilder() cache = CacheBuilder.newBuilder()
.maximumSize(1000) .maximumSize(1000)
.build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic)))); .build(CacheLoader.from(topic -> submit(new GetLeadership(topic))));
cacheUpdater = change -> { cacheUpdater = change -> {
Leadership leadership = change.newValue(); Leadership leadership = change.newValue();
@ -110,27 +113,27 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
@Override @Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) { public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic)); return submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
} }
@Override @Override
public CompletableFuture<Void> withdraw(String topic) { public CompletableFuture<Void> withdraw(String topic) {
return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic)); return submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
} }
@Override @Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) { public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic)); return submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
} }
@Override @Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) { public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic)); return submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
} }
@Override @Override
public CompletableFuture<Void> evict(NodeId nodeId) { public CompletableFuture<Void> evict(NodeId nodeId) {
return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId)); return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
} }
@Override @Override
@ -145,17 +148,17 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
@Override @Override
public CompletableFuture<Map<String, Leadership>> getLeaderships() { public CompletableFuture<Map<String, Leadership>> getLeaderships() {
return client.submit(new GetAllLeaderships()); return submit(new GetAllLeaderships());
} }
public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) { public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
return client.submit(new GetElectedTopics(nodeId)); return submit(new GetElectedTopics(nodeId));
} }
@Override @Override
public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) { public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
if (leadershipChangeListeners.isEmpty()) { if (leadershipChangeListeners.isEmpty()) {
return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer)); return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
} else { } else {
leadershipChangeListeners.add(consumer); leadershipChangeListeners.add(consumer);
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
@ -165,7 +168,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
@Override @Override
public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) { public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) { if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
return client.submit(new Unlisten()).thenApply(v -> null); return submit(new Unlisten()).thenApply(v -> null);
} }
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
@ -184,4 +187,11 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
public Collection<Consumer<Status>> statusChangeListeners() { public Collection<Consumer<Status>> statusChangeListeners() {
return ImmutableSet.copyOf(statusChangeListeners); return ImmutableSet.copyOf(statusChangeListeners);
} }
<T> CompletableFuture<T> submit(Operation<T> command) {
if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
return Tools.exceptionalFuture(new StorageException.Unavailable());
}
return client.submit(command);
}
} }