From 15b8ef5bec05445d5448ca8b0c20c147331e9539 Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Tue, 2 Feb 2016 17:35:05 -0800 Subject: [PATCH] PartitionManager + StoragePartition{Service,Client} implementation classes Change-Id: I2125c5678c760e9ed9fc856d1f9ba2ac4e4a3496 --- .../store/primitives/impl/Managed.java | 50 ++++++ .../primitives/impl/PartitionManager.java | 140 ++++++++++++++++ .../primitives/impl/StoragePartition.java | 149 ++++++++++++++++++ .../impl/StoragePartitionClient.java | 147 +++++++++++++++++ .../impl/StoragePartitionServer.java | 127 +++++++++++++++ 5 files changed, 613 insertions(+) create mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Managed.java create mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java create mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java create mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java create mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Managed.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Managed.java new file mode 100644 index 0000000000..193b0b5c8e --- /dev/null +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Managed.java @@ -0,0 +1,50 @@ +/* + * Copyright 2016 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.primitives.impl; + +import java.util.concurrent.CompletableFuture; + +/** + * Interface for types that can be asynchronously opened and closed. + */ +public interface Managed { + + /** + * Opens the managed object. + * + * @return A completable future to be completed once the object has been opened. + */ + CompletableFuture open(); + + /** + * Closes the managed object. + * + * @return A completable future to be completed once the object has been closed. + */ + CompletableFuture close(); + + /** + * Return {@code true} if the managed object is open. + * @return {@code true} if open + */ + boolean isOpen(); + + /** + * Return {@code true} if the managed object is closed. + * @return {@code true} if closed + */ + boolean isClosed(); +} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java new file mode 100644 index 0000000000..30d7fd814a --- /dev/null +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java @@ -0,0 +1,140 @@ +/* + * Copyright 2016 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.store.primitives.impl; + +import static org.slf4j.LoggerFactory.getLogger; + +import java.io.File; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.Tools; +import org.onosproject.cluster.ClusterMetadataService; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.NodeId; +import org.onosproject.cluster.PartitionId; +import org.onosproject.event.AbstractListenerManager; +import org.onosproject.store.cluster.messaging.MessagingService; +import org.onosproject.store.primitives.DistributedPrimitiveCreator; +import org.onosproject.store.primitives.PartitionAdminService; +import org.onosproject.store.primitives.PartitionEvent; +import org.onosproject.store.primitives.PartitionEventListener; +import org.onosproject.store.primitives.PartitionService; +import org.slf4j.Logger; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +/** + * Implementation of {@code PartitionService} and {@code PartitionAdminService}. + */ +@Component +@Service +public class PartitionManager extends AbstractListenerManager + implements PartitionService, PartitionAdminService { + + private final Logger log = getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MessagingService messagingService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterMetadataService metadataService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + Map partitions = Maps.newConcurrentMap(); + + @Activate + public void activate() { + eventDispatcher.addSink(PartitionEvent.class, listenerRegistry); + + metadataService.getClusterMetadata() + .getPartitions() + .stream() + .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition, + messagingService, + clusterService, + CatalystSerializers.getSerializer(), + new File(System.getProperty("karaf.data") + "/data/" + partition.getId())))); + + CompletableFuture openFuture = CompletableFuture.allOf(partitions.values() + .stream() + .map(StoragePartition::open) + .toArray(CompletableFuture[]::new)); + openFuture.join(); + log.info("Started"); + } + + public void deactivate() { + eventDispatcher.removeSink(PartitionEvent.class); + + CompletableFuture closeFuture = CompletableFuture.allOf(partitions.values() + .stream() + .map(StoragePartition::close) + .toArray(CompletableFuture[]::new)); + closeFuture.join(); + log.info("Stopped"); + } + + @Override + public CompletableFuture leave(PartitionId partitionId) { + // TODO: Implement + return Tools.exceptionalFuture(new UnsupportedOperationException()); + } + + @Override + public CompletableFuture join(PartitionId partitionId) { + // TODO: Implement + return Tools.exceptionalFuture(new UnsupportedOperationException()); + } + + @Override + public int getNumberOfPartitions() { + return partitions.size(); + } + + @Override + public Set getAllPartitionIds() { + return partitions.keySet(); + } + + @Override + public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) { + return partitions.get(partitionId).client(); + } + + @Override + public Set getConfiguredMembers(PartitionId partitionId) { + StoragePartition partition = partitions.get(partitionId); + return ImmutableSet.copyOf(partition.getMembers()); + } + + @Override + public Set getActiveMembersMembers(PartitionId partitionId) { + // TODO: This needs to query metadata to determine currently active + // members of partition + return getConfiguredMembers(partitionId); + } +} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java new file mode 100644 index 0000000000..dd49e23d85 --- /dev/null +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java @@ -0,0 +1,149 @@ +/* + * Copyright 2016 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.primitives.impl; + +import io.atomix.catalyst.serializer.Serializer; +import io.atomix.catalyst.transport.Address; +import io.atomix.resource.ResourceType; +import io.atomix.variables.DistributedLong; + +import java.io.File; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.DefaultPartition; +import org.onosproject.cluster.NodeId; +import org.onosproject.cluster.Partition; +import org.onosproject.store.cluster.messaging.MessagingService; +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap; + +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableSet; + +/** + * Storage partition. + */ +public class StoragePartition extends DefaultPartition implements Managed { + + private final AtomicBoolean isOpened = new AtomicBoolean(false); + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final Serializer serializer; + private final MessagingService messagingService; + private final ClusterService clusterService; + private final File logFolder; + private static final Collection RESOURCE_TYPES = ImmutableSet.of( + new ResourceType(DistributedLong.class), + new ResourceType(AtomixConsistentMap.class)); + + private NodeId localNodeId; + private Optional server = Optional.empty(); + private StoragePartitionClient client; + + public StoragePartition(Partition partition, + MessagingService messagingService, + ClusterService clusterService, + Serializer serializer, + File logFolder) { + super(partition); + this.messagingService = messagingService; + this.clusterService = clusterService; + this.localNodeId = clusterService.getLocalNode().id(); + this.serializer = serializer; + this.logFolder = logFolder; + } + + public StoragePartitionClient client() { + return client; + } + + @Override + public CompletableFuture open() { + return openServer().thenAccept(s -> server = Optional.of(s)) + .thenCompose(v-> openClient()) + .thenAccept(v -> isOpened.set(true)) + .thenApply(v -> null); + } + + @Override + public CompletableFuture close() { + return closeClient().thenCompose(v -> closeServer()) + .thenAccept(v -> isClosed.set(true)) + .thenApply(v -> null); + } + + public Collection
getMemberAddresses() { + return Collections2.transform(getMembers(), this::toAddress); + } + + private CompletableFuture openServer() { + if (!getMembers().contains(localNodeId)) { + return CompletableFuture.completedFuture(null); + } + StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId), + this, + serializer, + () -> new CopycatTransport(CopycatTransport.Mode.SERVER, + getId(), + messagingService), + RESOURCE_TYPES, + logFolder); + return server.open().thenApply(v -> server); + } + + private CompletableFuture openClient() { + client = new StoragePartitionClient(this, + serializer, + new CopycatTransport(CopycatTransport.Mode.CLIENT, + getId(), + messagingService), + RESOURCE_TYPES); + return client.open().thenApply(v -> client); + } + + private CompletableFuture closeServer() { + if (server.isPresent()) { + return server.get().close(); + } else { + return CompletableFuture.completedFuture(null); + } + } + + private CompletableFuture closeClient() { + if (client != null) { + return client.close(); + } + return CompletableFuture.completedFuture(null); + } + + private Address toAddress(NodeId nodeId) { + ControllerNode node = clusterService.getNode(nodeId); + return new Address(node.ip().toString(), node.tcpPort()); + } + + @Override + public boolean isOpen() { + return !isClosed.get() && isOpened.get(); + } + + @Override + public boolean isClosed() { + return isOpened.get() && isClosed.get(); + } +} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java new file mode 100644 index 0000000000..ffc2333f50 --- /dev/null +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java @@ -0,0 +1,147 @@ +/* + * Copyright 2016 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.primitives.impl; + +import io.atomix.Atomix; +import io.atomix.AtomixClient; +import io.atomix.catalyst.transport.Transport; +import io.atomix.resource.ResourceType; +import io.atomix.variables.DistributedLong; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +import org.onlab.util.HexString; +import org.onosproject.store.primitives.DistributedPrimitiveCreator; +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap; +import org.onosproject.store.primitives.resources.impl.AtomixCounter; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.AsyncAtomicCounter; +import org.onosproject.store.service.AsyncAtomicValue; +import org.onosproject.store.service.AsyncConsistentMap; +import org.onosproject.store.service.AsyncDistributedSet; +import org.onosproject.store.service.AsyncLeaderElector; +import org.onosproject.store.service.DistributedQueue; +import org.onosproject.store.service.Serializer; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableSet; + +/** + * StoragePartition client. + */ +public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed { + + private final StoragePartition partition; + private final Transport transport; + private final io.atomix.catalyst.serializer.Serializer serializer; + private final Collection resourceTypes; + private Atomix client; + private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values"; + private final Supplier> onosAtomicValuesMap = + Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME, + Serializer.using(KryoNamespaces.BASIC))); + + public StoragePartitionClient(StoragePartition partition, + io.atomix.catalyst.serializer.Serializer serializer, + Transport transport, + Collection resourceTypes) { + this.partition = partition; + this.serializer = serializer; + this.transport = transport; + this.resourceTypes = ImmutableSet.copyOf(resourceTypes); + } + + @Override + public CompletableFuture open() { + if (client != null && client.isOpen()) { + return CompletableFuture.completedFuture(null); + } + synchronized (StoragePartitionClient.this) { + client = AtomixClient.builder(partition.getMemberAddresses()) + .withSerializer(serializer.clone()) + .withResourceResolver(r -> { + resourceTypes.forEach(r::register); + }) + .withTransport(transport) + .build(); + } + return client.open().thenApply(v -> null); + } + + @Override + public CompletableFuture close() { + return client != null ? client.close() : CompletableFuture.completedFuture(null); + } + + @Override + public AsyncConsistentMap newAsyncConsistentMap(String name, Serializer serializer) { + AsyncConsistentMap rawMap = + new DelegatingAsyncConsistentMap(client.get(name, AtomixConsistentMap.class).join()) { + @Override + public String name() { + return name; + } + }; + AsyncConsistentMap transcodedMap = DistributedPrimitives.newTranscodingMap(rawMap, + key -> HexString.toHexString(serializer.encode(key)), + string -> serializer.decode(HexString.fromHexString(string)), + value -> value == null ? null : serializer.encode(value), + bytes -> serializer.decode(bytes)); + + return DistributedPrimitives.newCachingMap(transcodedMap); + } + + @Override + public AsyncDistributedSet newAsyncDistributedSet(String name, Serializer serializer) { + return DistributedPrimitives.newSetFromMap(this.newAsyncConsistentMap(name, serializer)); + } + + @Override + public AsyncAtomicCounter newAsyncCounter(String name) { + DistributedLong distributedLong = client.get(name, DistributedLong.class).join(); + return new AtomixCounter(name, distributedLong); + } + + @Override + public AsyncAtomicValue newAsyncAtomicValue(String name, Serializer serializer) { + return new DefaultAsyncAtomicValue<>(name, + serializer, + onosAtomicValuesMap.get()); + } + + @Override + public DistributedQueue newDistributedQueue(String name, Serializer serializer) { + // TODO: Implement + throw new UnsupportedOperationException(); + } + + @Override + public AsyncLeaderElector newAsyncLeaderElector(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isOpen() { + return client.isOpen(); + } + + @Override + public boolean isClosed() { + return client.isClosed(); + } +} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java new file mode 100644 index 0000000000..6ffc677f93 --- /dev/null +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java @@ -0,0 +1,127 @@ +/* + * Copyright 2016 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.primitives.impl; + +import io.atomix.catalyst.serializer.Serializer; +import io.atomix.catalyst.transport.Address; +import io.atomix.catalyst.transport.Transport; +import io.atomix.copycat.server.CopycatServer; +import io.atomix.copycat.server.storage.Storage; +import io.atomix.copycat.server.storage.StorageLevel; +import io.atomix.manager.state.ResourceManagerState; +import io.atomix.resource.ResourceRegistry; +import io.atomix.resource.ResourceType; +import io.atomix.resource.ResourceTypeResolver; +import io.atomix.resource.ServiceLoaderResourceResolver; + +import java.io.File; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +import org.onosproject.cluster.NodeId; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +/** + * {@link StoragePartition} server. + */ +public class StoragePartitionServer implements Managed { + + private static final int MAX_ENTRIES_PER_LOG_SEGMENT = 32768; + private final StoragePartition partition; + private final Address localAddress; + private final Supplier transport; + private final Serializer serializer; + private final File dataFolder; + private final Collection resourceTypes; + private CopycatServer server; + + public StoragePartitionServer(Address localAddress, + StoragePartition partition, + Serializer serializer, + Supplier transport, + Collection resourceTypes, + File dataFolder) { + this.partition = partition; + this.localAddress = localAddress; + this.serializer = serializer; + this.transport = transport; + this.resourceTypes = ImmutableSet.copyOf(resourceTypes); + this.dataFolder = dataFolder; + } + + @Override + public CompletableFuture open() { + CompletableFuture serverOpenFuture; + if (partition.getMemberAddresses().contains(localAddress)) { + if (server != null && server.isOpen()) { + return CompletableFuture.completedFuture(null); + } + synchronized (this) { + server = server(); + } + serverOpenFuture = server.open(); + } else { + serverOpenFuture = CompletableFuture.completedFuture(null); + } + return serverOpenFuture.thenApply(v -> null); + } + + @Override + public CompletableFuture close() { + // We do not close the server because doing so is equivalent to this node + // leaving the cluster and we don't want that here. + // The Raft protocol should take care of servers leaving unannounced. + return CompletableFuture.completedFuture(null); + } + + private CopycatServer server() { + ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver(); + ResourceRegistry registry = new ResourceRegistry(); + resourceTypes.forEach(registry::register); + resourceResolver.resolve(registry); + return CopycatServer.builder(localAddress, partition.getMemberAddresses()) + .withName("partition-" + partition.getId()) + .withSerializer(serializer.clone()) + .withTransport(transport.get()) + .withStateMachine(() -> new ResourceManagerState(registry)) + .withStorage(Storage.builder() + // FIXME: StorageLevel should be DISK + .withStorageLevel(StorageLevel.MEMORY) + .withSerializer(serializer.clone()) + .withDirectory(dataFolder) + .withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT) + .build()) + .build(); + } + + public Set configuredMembers() { + return Sets.newHashSet(partition.getMembers()); + } + + @Override + public boolean isOpen() { + return server.isOpen(); + } + + @Override + public boolean isClosed() { + return server.isClosed(); + } +}