diff --git a/cli/src/main/java/org/onosproject/cli/NodesListCommand.java b/cli/src/main/java/org/onosproject/cli/NodesListCommand.java index 5a55c9965c..a0967b3ab3 100644 --- a/cli/src/main/java/org/onosproject/cli/NodesListCommand.java +++ b/cli/src/main/java/org/onosproject/cli/NodesListCommand.java @@ -22,8 +22,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.karaf.shell.commands.Command; import org.joda.time.DateTime; import org.onlab.util.Tools; +import org.onosproject.cluster.ClusterAdminService; import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.MembershipAdminService; import org.onosproject.core.Version; import org.onosproject.utils.Comparators; @@ -32,7 +32,6 @@ import java.util.List; import static com.google.common.collect.Lists.newArrayList; - /** * Lists all controller cluster nodes. */ @@ -44,7 +43,7 @@ public class NodesListCommand extends AbstractShellCommand { @Override protected void execute() { - MembershipAdminService service = get(MembershipAdminService.class); + ClusterAdminService service = get(ClusterAdminService.class); List nodes = newArrayList(service.getNodes()); Collections.sort(nodes, Comparators.NODE_COMPARATOR); if (outputJson()) { @@ -68,7 +67,7 @@ public class NodesListCommand extends AbstractShellCommand { } // Produces JSON structure. - private JsonNode json(MembershipAdminService service, List nodes) { + private JsonNode json(ClusterAdminService service, List nodes) { ObjectMapper mapper = new ObjectMapper(); ArrayNode result = mapper.createArrayNode(); ControllerNode self = service.getLocalNode(); diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java b/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java index e68087718f..d1c4325f6f 100644 --- a/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java +++ b/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java @@ -15,14 +15,56 @@ */ package org.onosproject.cluster; +import java.util.Set; + +import org.onlab.packet.IpAddress; + /** * Service for administering the cluster node membership. - *

- * This service has a view of the cluster membership that is isolated to the local node's version during upgrades. - * For an equivalent service that has control over all nodes during an upgrade use - * {@link UnifiedClusterAdminService}. - * - * @see UnifiedClusterAdminService */ -public interface ClusterAdminService extends MembershipAdminService { +public interface ClusterAdminService extends ClusterService { + + /** + * Forms cluster configuration based on the specified set of node + * information.  This method resets and restarts the controller + * instance. + * + * @param nodes set of nodes that form the cluster + */ + void formCluster(Set nodes); + + /** + * Forms cluster configuration based on the specified set of node + * information.  This method resets and restarts the controller + * instance. + * + * @param nodes set of nodes that form the cluster + * @param partitionSize number of nodes to compose a partition + */ + void formCluster(Set nodes, int partitionSize); + + /** + * Adds a new controller node to the cluster. + * + * @param nodeId controller node identifier + * @param ip node IP listen address + * @param tcpPort tcp listen port + * @return newly added node + */ + ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort); + + /** + * Removes the specified node from the cluster node list. + * + * @param nodeId controller node identifier + */ + void removeNode(NodeId nodeId); + + /** + * Marks the current node as fully started or not. + * + * @param started true indicates all components have been started + */ + void markFullyStarted(boolean started); + } diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterService.java b/core/api/src/main/java/org/onosproject/cluster/ClusterService.java index f17e4d9e43..e4cf7ecc24 100644 --- a/core/api/src/main/java/org/onosproject/cluster/ClusterService.java +++ b/core/api/src/main/java/org/onosproject/cluster/ClusterService.java @@ -15,13 +15,63 @@ */ package org.onosproject.cluster; +import java.util.Set; + +import org.joda.time.DateTime; +import org.onosproject.core.Version; +import org.onosproject.event.ListenerService; + /** * Service for obtaining information about the individual nodes within the controller cluster. - *

- * This service's view of the nodes in the cluster is isolated to a single version of the software. During upgrades, - * when multiple versions of the software are running in the same cluster, users of this service will only be able - * to see nodes running the same version as the local node. This is useful for limiting communication to nodes running - * the same version of the software. */ -public interface ClusterService extends MembershipService { +public interface ClusterService extends ListenerService { + + /** + * Returns the local controller node. + * + * @return local controller node + */ + ControllerNode getLocalNode(); + + /** + * Returns the set of current cluster members. + * + * @return set of cluster members + */ + Set getNodes(); + + /** + * Returns the specified controller node. + * + * @param nodeId controller node identifier + * @return controller node + */ + ControllerNode getNode(NodeId nodeId); + + /** + * Returns the availability state of the specified controller node. Note + * that this does not imply that all the core and application components + * have been fully activated; only that the node has joined the cluster. + * + * @param nodeId controller node identifier + * @return availability state + */ + ControllerNode.State getState(NodeId nodeId); + + /** + * Returns the version of the given controller node. + * + * @param nodeId controller node identifier + * @return controller version + */ + Version getVersion(NodeId nodeId); + + /** + * Returns the system time when the availability state was last updated. + * + * @param nodeId controller node identifier + * @return system time when the availability state was last updated. + */ + DateTime getLastUpdated(NodeId nodeId); + } diff --git a/core/api/src/main/java/org/onosproject/cluster/ControllerNode.java b/core/api/src/main/java/org/onosproject/cluster/ControllerNode.java index 4983652d90..1d1a3ec45d 100644 --- a/core/api/src/main/java/org/onosproject/cluster/ControllerNode.java +++ b/core/api/src/main/java/org/onosproject/cluster/ControllerNode.java @@ -74,7 +74,6 @@ public interface ControllerNode { */ IpAddress ip(); - /** * Returns the TCP port on which the node listens for connections. * diff --git a/core/api/src/main/java/org/onosproject/cluster/Member.java b/core/api/src/main/java/org/onosproject/cluster/Member.java new file mode 100644 index 0000000000..796299b81c --- /dev/null +++ b/core/api/src/main/java/org/onosproject/cluster/Member.java @@ -0,0 +1,83 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.cluster; + +import java.util.Objects; + +import org.onosproject.core.Version; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Controller member identity. + */ +public final class Member { + + private final NodeId nodeId; + private final Version version; + + /** + * Creates a new cluster member identifier from the specified string. + * + * @param nodeId node identifier + * @param version node version + */ + public Member(NodeId nodeId, Version version) { + this.nodeId = checkNotNull(nodeId); + this.version = version; + } + + /** + * Returns the node identifier. + * + * @return the node identifier + */ + public NodeId nodeId() { + return nodeId; + } + + /** + * Returns the node version. + * + * @return the node version + */ + public Version version() { + return version; + } + + @Override + public int hashCode() { + return Objects.hash(nodeId, version); + } + + @Override + public boolean equals(Object object) { + if (object instanceof Member) { + Member member = (Member) object; + return member.nodeId.equals(nodeId) && Objects.equals(member.version, version); + } + return false; + } + + @Override + public String toString() { + return toStringHelper(this) + .add("nodeId", nodeId) + .add("version", version) + .toString(); + } +} diff --git a/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java b/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java deleted file mode 100644 index 908a96333e..0000000000 --- a/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.cluster; - -import java.util.Set; - -import org.onlab.packet.IpAddress; - -/** - * Service for administering the cluster node membership. - */ -public interface MembershipAdminService extends MembershipService { - - /** - * Forms cluster configuration based on the specified set of node - * information.  This method resets and restarts the controller - * instance. - * - * @param nodes set of nodes that form the cluster - */ - void formCluster(Set nodes); - - /** - * Forms cluster configuration based on the specified set of node - * information.  This method resets and restarts the controller - * instance. - * - * @param nodes set of nodes that form the cluster - * @param partitionSize number of nodes to compose a partition - */ - void formCluster(Set nodes, int partitionSize); - - /** - * Adds a new controller node to the cluster. - * - * @param nodeId controller node identifier - * @param ip node IP listen address - * @param tcpPort tcp listen port - * @return newly added node - */ - ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort); - - /** - * Removes the specified node from the cluster node list. - * - * @param nodeId controller node identifier - */ - void removeNode(NodeId nodeId); - - /** - * Marks the current node as fully started or not. - * - * @param started true indicates all components have been started - */ - void markFullyStarted(boolean started); - -} diff --git a/core/api/src/main/java/org/onosproject/cluster/MembershipGroup.java b/core/api/src/main/java/org/onosproject/cluster/MembershipGroup.java new file mode 100644 index 0000000000..6526a1d305 --- /dev/null +++ b/core/api/src/main/java/org/onosproject/cluster/MembershipGroup.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.cluster; + +import java.util.Set; + +import org.onosproject.core.Version; + +import static com.google.common.base.MoreObjects.toStringHelper; + +/** + * Membership group. + */ +public class MembershipGroup { + private final Version version; + private final Set members; + + public MembershipGroup(Version version, Set members) { + this.version = version; + this.members = members; + } + + /** + * Returns the group version. + * + * @return the group version + */ + public Version version() { + return version; + } + + /** + * Returns the set of members in the group. + * + * @return the set of members in the group + */ + public Set members() { + return members; + } + + @Override + public String toString() { + return toStringHelper(this) + .add("version", version) + .add("members", members) + .toString(); + } +} diff --git a/core/api/src/main/java/org/onosproject/cluster/MembershipService.java b/core/api/src/main/java/org/onosproject/cluster/MembershipService.java index 7b366f6e82..f56daf5932 100644 --- a/core/api/src/main/java/org/onosproject/cluster/MembershipService.java +++ b/core/api/src/main/java/org/onosproject/cluster/MembershipService.java @@ -15,32 +15,59 @@ */ package org.onosproject.cluster; +import java.util.Collection; import java.util.Set; -import org.joda.time.DateTime; import org.onosproject.core.Version; -import org.onosproject.event.ListenerService; /** - * Service for obtaining information about the individual nodes within - * the controller cluster. + * Service for obtaining information about the individual members of the controller cluster. */ -public interface MembershipService - extends ListenerService { +public interface MembershipService { /** - * Returns the local controller node. + * Returns the local member. * - * @return local controller node + * @return local member */ - ControllerNode getLocalNode(); + Member getLocalMember(); /** - * Returns the set of current cluster members. + * Returns the group associated with the local member. * - * @return set of cluster members + * @return the group associated with the local member */ - Set getNodes(); + MembershipGroup getLocalGroup(); + + /** + * Returns the set of current cluster members in the local group. + * + * @return set of cluster members in the local group + */ + Set getMembers(); + + /** + * Returns the set of membership groups in the cluster. + * + * @return the set of membership groups in the cluster + */ + Collection getGroups(); + + /** + * Returns the membership group for the given version. + * + * @param version the version for which to return the membership group + * @return the membership group for the given version + */ + MembershipGroup getGroup(Version version); + + /** + * Returns the set of members in the given version. + * + * @param version the version for which to return the set of members + * @return the set of members for the given version + */ + Set getMembers(Version version); /** * Returns the specified controller node. @@ -48,32 +75,6 @@ public interface MembershipService * @param nodeId controller node identifier * @return controller node */ - ControllerNode getNode(NodeId nodeId); - - /** - * Returns the availability state of the specified controller node. Note - * that this does not imply that all the core and application components - * have been fully activated; only that the node has joined the cluster. - * - * @param nodeId controller node identifier - * @return availability state - */ - ControllerNode.State getState(NodeId nodeId); - - /** - * Returns the version of the given controller node. - * - * @param nodeId controller node identifier - * @return controller version - */ - Version getVersion(NodeId nodeId); - - /** - * Returns the system time when the availability state was last updated. - * - * @param nodeId controller node identifier - * @return system time when the availability state was last updated. - */ - DateTime getLastUpdated(NodeId nodeId); + Member getMember(NodeId nodeId); } diff --git a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java deleted file mode 100644 index 9fb7407791..0000000000 --- a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.cluster; - -import com.google.common.annotations.Beta; - -/** - * Cluster membership administration service that supports modification of all nodes during an upgrade. - */ -@Beta -public interface UnifiedClusterAdminService extends MembershipAdminService { -} diff --git a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java deleted file mode 100644 index f690f3edc7..0000000000 --- a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.cluster; - -import com.google.common.annotations.Beta; - -/** - * Unified multi-version cluster membership service. - *

- * During upgrades, the nodes within a cluster may be running multiple versions of the software. - * This service has a view of the entire cluster running any version. Users of this service must be careful when - * communicating with nodes described by this service as compatibility issues can result from communicating across - * versions. For an equivalent service that has an isolated view of the cluster, see {@link ClusterService}. - * - * @see ClusterService - */ -@Beta -public interface UnifiedClusterService extends MembershipService { -} diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java index dfc558d747..7aa5ac6261 100644 --- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java +++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java @@ -15,15 +15,152 @@ */ package org.onosproject.store.cluster.messaging; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.onosproject.cluster.NodeId; + /** * Service for assisting communications between controller cluster nodes. - *

- * Communication via this service is isolated to nodes running a single version of the software. During upgrades, when - * nodes may be running multiple versions simultaneously, this service prevents nodes running different versions of - * the software from communicating with each other, thus avoiding compatibility issues. For an equivalent cross-version - * compatible service, see {@link UnifiedClusterCommunicationService}. - * - * @see UnifiedClusterCommunicationService */ -public interface ClusterCommunicationService extends ClusterCommunicator { +public interface ClusterCommunicationService { + + /** + * Adds a new subscriber for the specified message subject. + * + * @param subject message subject + * @param subscriber message subscriber + * @param executor executor to use for running handler. + * @deprecated in Cardinal Release + */ + @Deprecated + void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor); + + /** + * Broadcasts a message to all controller nodes. + * + * @param message message to send + * @param subject message subject + * @param encoder function for encoding message to byte[] + * @param message type + */ + void broadcast(M message, + MessageSubject subject, + Function encoder); + + /** + * Broadcasts a message to all controller nodes including self. + * + * @param message message to send + * @param subject message subject + * @param encoder function for encoding message to byte[] + * @param message type + */ + void broadcastIncludeSelf(M message, + MessageSubject subject, + Function encoder); + + /** + * Sends a message to the specified controller node. + * + * @param message message to send + * @param subject message subject + * @param encoder function for encoding message to byte[] + * @param toNodeId destination node identifier + * @param message type + * @return future that is completed when the message is sent + */ + CompletableFuture unicast(M message, + MessageSubject subject, + Function encoder, + NodeId toNodeId); + + /** + * Multicasts a message to a set of controller nodes. + * + * @param message message to send + * @param subject message subject + * @param encoder function for encoding message to byte[] + * @param nodeIds recipient node identifiers + * @param message type + */ + void multicast(M message, + MessageSubject subject, + Function encoder, + Set nodeIds); + + /** + * Sends a message and expects a reply. + * + * @param message message to send + * @param subject message subject + * @param encoder function for encoding request to byte[] + * @param decoder function for decoding response from byte[] + * @param toNodeId recipient node identifier + * @param request type + * @param reply type + * @return reply future + */ + CompletableFuture sendAndReceive(M message, + MessageSubject subject, + Function encoder, + Function decoder, + NodeId toNodeId); + + /** + * Adds a new subscriber for the specified message subject. + * + * @param subject message subject + * @param decoder decoder for resurrecting incoming message + * @param handler handler function that processes the incoming message and produces a reply + * @param encoder encoder for serializing reply + * @param executor executor to run this handler on + * @param incoming message type + * @param reply message type + */ + void addSubscriber(MessageSubject subject, + Function decoder, + Function handler, + Function encoder, + Executor executor); + + /** + * Adds a new subscriber for the specified message subject. + * + * @param subject message subject + * @param decoder decoder for resurrecting incoming message + * @param handler handler function that processes the incoming message and produces a reply + * @param encoder encoder for serializing reply + * @param incoming message type + * @param reply message type + */ + void addSubscriber(MessageSubject subject, + Function decoder, + Function> handler, + Function encoder); + + /** + * Adds a new subscriber for the specified message subject. + * + * @param subject message subject + * @param decoder decoder to resurrecting incoming message + * @param handler handler for handling message + * @param executor executor to run this handler on + * @param incoming message type + */ + void addSubscriber(MessageSubject subject, + Function decoder, + Consumer handler, + Executor executor); + + /** + * Removes a subscriber for the specified message subject. + * + * @param subject message subject + */ + void removeSubscriber(MessageSubject subject); } diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java deleted file mode 100644 index 7d2480e1e9..0000000000 --- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.cluster.messaging; - -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.onosproject.cluster.NodeId; - -/** - * Service for assisting communications between controller cluster nodes. - */ -public interface ClusterCommunicator { - - /** - * Adds a new subscriber for the specified message subject. - * - * @param subject message subject - * @param subscriber message subscriber - * @param executor executor to use for running handler. - * @deprecated in Cardinal Release - */ - @Deprecated - void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor); - - /** - * Broadcasts a message to all controller nodes. - * - * @param message message to send - * @param subject message subject - * @param encoder function for encoding message to byte[] - * @param message type - */ - void broadcast(M message, - MessageSubject subject, - Function encoder); - - /** - * Broadcasts a message to all controller nodes including self. - * - * @param message message to send - * @param subject message subject - * @param encoder function for encoding message to byte[] - * @param message type - */ - void broadcastIncludeSelf(M message, - MessageSubject subject, - Function encoder); - - /** - * Sends a message to the specified controller node. - * - * @param message message to send - * @param subject message subject - * @param encoder function for encoding message to byte[] - * @param toNodeId destination node identifier - * @param message type - * @return future that is completed when the message is sent - */ - CompletableFuture unicast(M message, - MessageSubject subject, - Function encoder, - NodeId toNodeId); - - /** - * Multicasts a message to a set of controller nodes. - * - * @param message message to send - * @param subject message subject - * @param encoder function for encoding message to byte[] - * @param nodeIds recipient node identifiers - * @param message type - */ - void multicast(M message, - MessageSubject subject, - Function encoder, - Set nodeIds); - - /** - * Sends a message and expects a reply. - * - * @param message message to send - * @param subject message subject - * @param encoder function for encoding request to byte[] - * @param decoder function for decoding response from byte[] - * @param toNodeId recipient node identifier - * @param request type - * @param reply type - * @return reply future - */ - CompletableFuture sendAndReceive(M message, - MessageSubject subject, - Function encoder, - Function decoder, - NodeId toNodeId); - - /** - * Adds a new subscriber for the specified message subject. - * - * @param subject message subject - * @param decoder decoder for resurrecting incoming message - * @param handler handler function that processes the incoming message and produces a reply - * @param encoder encoder for serializing reply - * @param executor executor to run this handler on - * @param incoming message type - * @param reply message type - */ - void addSubscriber(MessageSubject subject, - Function decoder, - Function handler, - Function encoder, - Executor executor); - - /** - * Adds a new subscriber for the specified message subject. - * - * @param subject message subject - * @param decoder decoder for resurrecting incoming message - * @param handler handler function that processes the incoming message and produces a reply - * @param encoder encoder for serializing reply - * @param incoming message type - * @param reply message type - */ - void addSubscriber(MessageSubject subject, - Function decoder, - Function> handler, - Function encoder); - - /** - * Adds a new subscriber for the specified message subject. - * - * @param subject message subject - * @param decoder decoder to resurrecting incoming message - * @param handler handler for handling message - * @param executor executor to run this handler on - * @param incoming message type - */ - void addSubscriber(MessageSubject subject, - Function decoder, - Consumer handler, - Executor executor); - - /** - * Removes a subscriber for the specified message subject. - * - * @param subject message subject - */ - void removeSubscriber(MessageSubject subject); -} diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java deleted file mode 100644 index 042c8032f3..0000000000 --- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.cluster.messaging; - -import com.google.common.annotations.Beta; - -/** - * Service for unified communication across controller nodes running multiple software versions. - *

- * This service supports communicating across nodes running different versions of the software simultaneously. During - * upgrades, when nodes may be running a mixture of versions, this service can be used to coordinate across those - * versions. But users of this service must be extremely careful to preserve backward/forward compatibility for - * messages sent across versions. Encoders and decoders used for messages sent/received on this service should - * support evolving schemas. - */ -@Beta -public interface UnifiedClusterCommunicationService extends ClusterCommunicator { -} diff --git a/core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/MembershipServiceAdapter.java similarity index 62% rename from core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java rename to core/api/src/test/java/org/onosproject/cluster/MembershipServiceAdapter.java index aca74ecc62..172c82c6ed 100644 --- a/core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java +++ b/core/api/src/test/java/org/onosproject/cluster/MembershipServiceAdapter.java @@ -15,52 +15,47 @@ */ package org.onosproject.cluster; +import java.util.Collection; import java.util.Set; -import org.joda.time.DateTime; import org.onosproject.core.Version; /** - * Compatible cluster service adapter. + * Membership service adapter. */ -public class UnifiedClusterServiceAdapter implements UnifiedClusterService { +public class MembershipServiceAdapter implements MembershipService { @Override - public ControllerNode getLocalNode() { + public Member getLocalMember() { return null; } @Override - public Set getNodes() { + public MembershipGroup getLocalGroup() { return null; } @Override - public ControllerNode getNode(NodeId nodeId) { + public Set getMembers() { return null; } @Override - public ControllerNode.State getState(NodeId nodeId) { + public Collection getGroups() { return null; } @Override - public Version getVersion(NodeId nodeId) { + public MembershipGroup getGroup(Version version) { return null; } @Override - public DateTime getLastUpdated(NodeId nodeId) { + public Set getMembers(Version version) { return null; } @Override - public void addListener(ClusterEventListener listener) { - - } - - @Override - public void removeListener(ClusterEventListener listener) { - + public Member getMember(NodeId nodeId) { + return null; } } diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java index df7feff6e0..fc3d802065 100644 --- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java +++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java @@ -15,28 +15,48 @@ */ package org.onosproject.cluster.impl; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Set; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicReference; +import com.google.common.collect.Sets; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; +import org.apache.karaf.system.SystemService; import org.joda.time.DateTime; import org.onlab.packet.IpAddress; +import org.onlab.util.Tools; import org.onosproject.cluster.ClusterAdminService; +import org.onosproject.cluster.ClusterEvent; import org.onosproject.cluster.ClusterEventListener; +import org.onosproject.cluster.ClusterMetadata; +import org.onosproject.cluster.ClusterMetadataAdminService; +import org.onosproject.cluster.ClusterMetadataDiff; +import org.onosproject.cluster.ClusterMetadataEvent; +import org.onosproject.cluster.ClusterMetadataEventListener; +import org.onosproject.cluster.ClusterMetadataService; import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ClusterStore; +import org.onosproject.cluster.ClusterStoreDelegate; import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.UnifiedClusterAdminService; -import org.onosproject.cluster.UnifiedClusterService; +import org.onosproject.cluster.DefaultPartition; import org.onosproject.cluster.NodeId; +import org.onosproject.cluster.Partition; +import org.onosproject.cluster.PartitionId; import org.onosproject.core.Version; -import org.onosproject.core.VersionService; +import org.onosproject.event.AbstractListenerManager; import org.slf4j.Logger; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static org.onosproject.security.AppGuard.checkPermission; import static org.onosproject.security.AppPermission.Type.CLUSTER_READ; import static org.slf4j.LoggerFactory.getLogger; @@ -46,122 +66,172 @@ import static org.slf4j.LoggerFactory.getLogger; */ @Component(immediate = true) @Service -public class ClusterManager implements ClusterService, ClusterAdminService { +public class ClusterManager + extends AbstractListenerManager + implements ClusterService, ClusterAdminService { + public static final String INSTANCE_ID_NULL = "Instance ID cannot be null"; + private static final int DEFAULT_PARTITION_SIZE = 3; private final Logger log = getLogger(getClass()); - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - private UnifiedClusterService clusterService; + private ClusterStoreDelegate delegate = new InternalStoreDelegate(); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - private UnifiedClusterAdminService clusterAdminService; + protected ClusterMetadataService clusterMetadataService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - private VersionService versionService; + protected ClusterMetadataAdminService clusterMetadataAdminService; - private Version version; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterStore store; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected SystemService systemService; + + private final AtomicReference currentMetadata = new AtomicReference<>(); + private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener(); @Activate public void activate() { - version = versionService.version(); + store.setDelegate(delegate); + eventDispatcher.addSink(ClusterEvent.class, listenerRegistry); + clusterMetadataService.addListener(metadataListener); + processMetadata(clusterMetadataService.getClusterMetadata()); log.info("Started"); } @Deactivate public void deactivate() { + clusterMetadataService.removeListener(metadataListener); + store.unsetDelegate(delegate); + eventDispatcher.removeSink(ClusterEvent.class); log.info("Stopped"); } @Override public ControllerNode getLocalNode() { checkPermission(CLUSTER_READ); - return clusterService.getLocalNode(); + return store.getLocalNode(); } @Override public Set getNodes() { checkPermission(CLUSTER_READ); - return clusterService.getNodes() - .stream() - .filter(node -> clusterService.getVersion(node.id()).equals(version)) - .collect(Collectors.toSet()); + return store.getNodes(); } @Override public ControllerNode getNode(NodeId nodeId) { checkPermission(CLUSTER_READ); - Version nodeVersion = clusterService.getVersion(nodeId); - if (nodeVersion != null && nodeVersion.equals(version)) { - return clusterService.getNode(nodeId); - } - return null; + checkNotNull(nodeId, INSTANCE_ID_NULL); + return store.getNode(nodeId); } @Override public ControllerNode.State getState(NodeId nodeId) { checkPermission(CLUSTER_READ); - Version nodeVersion = clusterService.getVersion(nodeId); - if (nodeVersion != null && nodeVersion.equals(version)) { - return clusterService.getState(nodeId); - } - return null; + checkNotNull(nodeId, INSTANCE_ID_NULL); + return store.getState(nodeId); } @Override public Version getVersion(NodeId nodeId) { checkPermission(CLUSTER_READ); - Version nodeVersion = clusterService.getVersion(nodeId); - if (nodeVersion != null && nodeVersion.equals(version)) { - return nodeVersion; - } - return null; + checkNotNull(nodeId, INSTANCE_ID_NULL); + return store.getVersion(nodeId); } @Override public void markFullyStarted(boolean started) { - clusterAdminService.markFullyStarted(started); + store.markFullyStarted(started); } @Override public DateTime getLastUpdated(NodeId nodeId) { checkPermission(CLUSTER_READ); - Version nodeVersion = clusterService.getVersion(nodeId); - if (nodeVersion != null && nodeVersion.equals(version)) { - return clusterService.getLastUpdated(nodeId); - } - return null; + return store.getLastUpdated(nodeId); } @Override public void formCluster(Set nodes) { - clusterAdminService.formCluster(nodes); + formCluster(nodes, DEFAULT_PARTITION_SIZE); } @Override public void formCluster(Set nodes, int partitionSize) { - clusterAdminService.formCluster(nodes, partitionSize); - } + checkNotNull(nodes, "Nodes cannot be null"); + checkArgument(!nodes.isEmpty(), "Nodes cannot be empty"); - @Override - public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) { - return clusterAdminService.addNode(nodeId, ip, tcpPort); - } - - @Override - public void removeNode(NodeId nodeId) { - Version nodeVersion = clusterService.getVersion(nodeId); - if (nodeVersion != null && nodeVersion.equals(version)) { - clusterAdminService.removeNode(nodeId); + ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize)); + clusterMetadataAdminService.setClusterMetadata(metadata); + try { + log.warn("Shutting down container for cluster reconfiguration!"); + // Clean up persistent state associated with previous cluster configuration. + Tools.removeDirectory(System.getProperty("karaf.data") + "/partitions"); + systemService.reboot("now", SystemService.Swipe.NONE); + } catch (Exception e) { + log.error("Unable to reboot container", e); } } @Override - public void addListener(ClusterEventListener listener) { - clusterService.addListener(listener); + public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) { + checkNotNull(nodeId, INSTANCE_ID_NULL); + checkNotNull(ip, "IP address cannot be null"); + checkArgument(tcpPort > 5000, "TCP port must be > 5000"); + return store.addNode(nodeId, ip, tcpPort); } @Override - public void removeListener(ClusterEventListener listener) { - clusterService.removeListener(listener); + public void removeNode(NodeId nodeId) { + checkNotNull(nodeId, INSTANCE_ID_NULL); + store.removeNode(nodeId); + } + + // Store delegate to re-post events emitted from the store. + private class InternalStoreDelegate implements ClusterStoreDelegate { + @Override + public void notify(ClusterEvent event) { + post(event); + } + } + + private static Set buildDefaultPartitions(Collection nodes, int partitionSize) { + List sorted = new ArrayList<>(nodes); + Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString())); + Set partitions = Sets.newHashSet(); + // add partitions + int length = nodes.size(); + int count = Math.min(partitionSize, length); + for (int i = 0; i < length; i++) { + int index = i; + Set set = new HashSet<>(count); + for (int j = 0; j < count; j++) { + set.add(sorted.get((i + j) % length).id()); + } + partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set)); + } + return partitions; + } + + /** + * Processes metadata by adding and removing nodes from the cluster. + */ + private synchronized void processMetadata(ClusterMetadata metadata) { + try { + ClusterMetadataDiff examiner = + new ClusterMetadataDiff(currentMetadata.get(), metadata); + examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort())); + examiner.nodesRemoved().forEach(this::removeNode); + } finally { + currentMetadata.set(metadata); + } + } + + private class InternalClusterMetadataListener implements ClusterMetadataEventListener { + @Override + public void event(ClusterMetadataEvent event) { + processMetadata(event.subject()); + } } } diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MembershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MembershipManager.java new file mode 100644 index 0000000000..8d869ca971 --- /dev/null +++ b/core/net/src/main/java/org/onosproject/cluster/impl/MembershipManager.java @@ -0,0 +1,119 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.cluster.impl; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.Member; +import org.onosproject.cluster.MembershipGroup; +import org.onosproject.cluster.MembershipService; +import org.onosproject.cluster.NodeId; +import org.onosproject.core.Version; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Cluster membership manager. + */ +@Component(immediate = true) +@Service +public class MembershipManager implements MembershipService { + + private final Logger log = getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + private Member localMember; + + @Activate + public void activate() { + localMember = new Member( + clusterService.getLocalNode().id(), + clusterService.getVersion(clusterService.getLocalNode().id())); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + private Member toMemberId(ControllerNode node) { + return new Member(node.id(), clusterService.getVersion(node.id())); + } + + @Override + public Member getLocalMember() { + return localMember; + } + + @Override + public MembershipGroup getLocalGroup() { + return getGroup(getLocalMember().version()); + } + + @Override + public Set getMembers() { + return clusterService.getNodes().stream() + .map(this::toMemberId) + .collect(Collectors.toSet()); + } + + @Override + public Collection getGroups() { + Map> groups = Maps.newHashMap(); + clusterService.getNodes().stream() + .map(this::toMemberId) + .forEach(member -> + groups.computeIfAbsent(member.version(), k -> Sets.newHashSet()).add(member)); + return Maps.transformEntries(groups, MembershipGroup::new).values(); + } + + @Override + public MembershipGroup getGroup(Version version) { + return new MembershipGroup(version, getMembers(version)); + } + + @Override + public Set getMembers(Version version) { + return getMembers() + .stream() + .filter(m -> Objects.equals(m.version(), version)) + .collect(Collectors.toSet()); + } + + @Override + public Member getMember(NodeId nodeId) { + ControllerNode node = clusterService.getNode(nodeId); + return node != null ? toMemberId(node) : null; + } +} diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java deleted file mode 100644 index 3617305cc4..0000000000 --- a/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright 2014-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.cluster.impl; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - -import com.google.common.collect.Sets; -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.apache.karaf.system.SystemService; -import org.joda.time.DateTime; -import org.onlab.packet.IpAddress; -import org.onlab.util.Tools; -import org.onosproject.cluster.ClusterEvent; -import org.onosproject.cluster.ClusterEventListener; -import org.onosproject.cluster.ClusterMetadata; -import org.onosproject.cluster.ClusterMetadataAdminService; -import org.onosproject.cluster.ClusterMetadataDiff; -import org.onosproject.cluster.ClusterMetadataEvent; -import org.onosproject.cluster.ClusterMetadataEventListener; -import org.onosproject.cluster.ClusterMetadataService; -import org.onosproject.cluster.ClusterStore; -import org.onosproject.cluster.ClusterStoreDelegate; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.DefaultPartition; -import org.onosproject.cluster.UnifiedClusterAdminService; -import org.onosproject.cluster.UnifiedClusterService; -import org.onosproject.cluster.NodeId; -import org.onosproject.cluster.Partition; -import org.onosproject.cluster.PartitionId; -import org.onosproject.core.Version; -import org.onosproject.event.AbstractListenerManager; -import org.slf4j.Logger; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.onosproject.security.AppGuard.checkPermission; -import static org.onosproject.security.AppPermission.Type.CLUSTER_READ; -import static org.slf4j.LoggerFactory.getLogger; - -/** - * Implementation of the cluster service. - */ -@Component(immediate = true) -@Service -public class UnifiedClusterManager - extends AbstractListenerManager - implements UnifiedClusterService, UnifiedClusterAdminService { - - public static final String INSTANCE_ID_NULL = "Instance ID cannot be null"; - private static final int DEFAULT_PARTITION_SIZE = 3; - private final Logger log = getLogger(getClass()); - - private ClusterStoreDelegate delegate = new InternalStoreDelegate(); - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterMetadataService clusterMetadataService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterMetadataAdminService clusterMetadataAdminService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterStore store; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected SystemService systemService; - - private final AtomicReference currentMetadata = new AtomicReference<>(); - private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener(); - - @Activate - public void activate() { - store.setDelegate(delegate); - eventDispatcher.addSink(ClusterEvent.class, listenerRegistry); - clusterMetadataService.addListener(metadataListener); - processMetadata(clusterMetadataService.getClusterMetadata()); - log.info("Started"); - } - - @Deactivate - public void deactivate() { - clusterMetadataService.removeListener(metadataListener); - store.unsetDelegate(delegate); - eventDispatcher.removeSink(ClusterEvent.class); - log.info("Stopped"); - } - - @Override - public ControllerNode getLocalNode() { - checkPermission(CLUSTER_READ); - return store.getLocalNode(); - } - - @Override - public Set getNodes() { - checkPermission(CLUSTER_READ); - return store.getNodes(); - } - - @Override - public ControllerNode getNode(NodeId nodeId) { - checkPermission(CLUSTER_READ); - checkNotNull(nodeId, INSTANCE_ID_NULL); - return store.getNode(nodeId); - } - - @Override - public ControllerNode.State getState(NodeId nodeId) { - checkPermission(CLUSTER_READ); - checkNotNull(nodeId, INSTANCE_ID_NULL); - return store.getState(nodeId); - } - - @Override - public Version getVersion(NodeId nodeId) { - checkPermission(CLUSTER_READ); - checkNotNull(nodeId, INSTANCE_ID_NULL); - return store.getVersion(nodeId); - } - - @Override - public void markFullyStarted(boolean started) { - store.markFullyStarted(started); - } - - @Override - public DateTime getLastUpdated(NodeId nodeId) { - checkPermission(CLUSTER_READ); - return store.getLastUpdated(nodeId); - } - - @Override - public void formCluster(Set nodes) { - formCluster(nodes, DEFAULT_PARTITION_SIZE); - } - - @Override - public void formCluster(Set nodes, int partitionSize) { - checkNotNull(nodes, "Nodes cannot be null"); - checkArgument(!nodes.isEmpty(), "Nodes cannot be empty"); - - ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize)); - clusterMetadataAdminService.setClusterMetadata(metadata); - try { - log.warn("Shutting down container for cluster reconfiguration!"); - // Clean up persistent state associated with previous cluster configuration. - Tools.removeDirectory(System.getProperty("karaf.data") + "/partitions"); - systemService.reboot("now", SystemService.Swipe.NONE); - } catch (Exception e) { - log.error("Unable to reboot container", e); - } - } - - @Override - public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) { - checkNotNull(nodeId, INSTANCE_ID_NULL); - checkNotNull(ip, "IP address cannot be null"); - checkArgument(tcpPort > 5000, "TCP port must be > 5000"); - return store.addNode(nodeId, ip, tcpPort); - } - - @Override - public void removeNode(NodeId nodeId) { - checkNotNull(nodeId, INSTANCE_ID_NULL); - store.removeNode(nodeId); - } - - // Store delegate to re-post events emitted from the store. - private class InternalStoreDelegate implements ClusterStoreDelegate { - @Override - public void notify(ClusterEvent event) { - post(event); - } - } - - private static Set buildDefaultPartitions(Collection nodes, int partitionSize) { - List sorted = new ArrayList<>(nodes); - Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString())); - Set partitions = Sets.newHashSet(); - // add partitions - int length = nodes.size(); - int count = Math.min(partitionSize, length); - for (int i = 0; i < length; i++) { - int index = i; - Set set = new HashSet<>(count); - for (int j = 0; j < count; j++) { - set.add(sorted.get((i + j) % length).id()); - } - partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set)); - } - return partitions; - } - - /** - * Processes metadata by adding and removing nodes from the cluster. - */ - private synchronized void processMetadata(ClusterMetadata metadata) { - try { - ClusterMetadataDiff examiner = - new ClusterMetadataDiff(currentMetadata.get(), metadata); - examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort())); - examiner.nodesRemoved().forEach(this::removeNode); - } finally { - currentMetadata.set(metadata); - } - } - - private class InternalClusterMetadataListener implements ClusterMetadataEventListener { - @Override - public void event(ClusterMetadataEvent event) { - processMetadata(event.subject()); - } - } -} diff --git a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java index 410b137494..9fdd7a23d5 100644 --- a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java +++ b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java @@ -28,9 +28,10 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.onosproject.cluster.ClusterEvent; import org.onosproject.cluster.ClusterEventListener; +import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.MembershipService; import org.onosproject.cluster.NodeId; -import org.onosproject.cluster.UnifiedClusterService; import org.onosproject.core.Version; import org.onosproject.core.VersionService; import org.onosproject.event.AbstractListenerManager; @@ -70,7 +71,10 @@ public class UpgradeManager protected CoordinationService coordinationService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected UnifiedClusterService clusterService; + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MembershipService membershipService; private Version localVersion; private AtomicValue state; @@ -243,13 +247,8 @@ public class UpgradeManager } // Determine whether any nodes have not been upgraded to the target version. - boolean upgradeComplete = clusterService.getNodes() - .stream() - .allMatch(node -> { - ControllerNode.State state = clusterService.getState(node.id()); - Version version = clusterService.getVersion(node.id()); - return state.isActive() && version != null && version.equals(upgraded.target()); - }); + boolean upgradeComplete = membershipService.getGroups().size() == 1 + && membershipService.getLocalGroup().version().equals(upgraded.target()); // If some nodes have not yet been upgraded, throw an exception. if (!upgradeComplete) { @@ -333,13 +332,8 @@ public class UpgradeManager } // Determine whether any nodes are still running the target version. - boolean rollbackComplete = clusterService.getNodes() - .stream() - .allMatch(node -> { - ControllerNode.State state = clusterService.getState(node.id()); - Version version = clusterService.getVersion(node.id()); - return state.isActive() && version != null && version.equals(upgraded.source()); - }); + boolean rollbackComplete = membershipService.getGroups().size() == 1 + && membershipService.getLocalGroup().version().equals(upgraded.source()); // If some nodes have not yet been downgraded, throw an exception. if (!rollbackComplete) { diff --git a/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java index 48337cdd24..78127eead4 100644 --- a/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java +++ b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java @@ -16,17 +16,24 @@ package org.onosproject.upgrade.impl; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.junit.Test; import org.onlab.packet.IpAddress; import org.onosproject.cluster.ClusterEvent; -import org.onosproject.cluster.UnifiedClusterServiceAdapter; +import org.onosproject.cluster.ClusterServiceAdapter; import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.DefaultControllerNode; +import org.onosproject.cluster.Member; +import org.onosproject.cluster.MembershipGroup; +import org.onosproject.cluster.MembershipServiceAdapter; import org.onosproject.cluster.NodeId; import org.onosproject.core.Version; import org.onosproject.core.VersionServiceAdapter; @@ -59,7 +66,29 @@ public class UpgradeManagerTest { @SuppressWarnings("unchecked") private UpgradeManager createUpgradeManager(Version version, Upgrade state, List versions) { UpgradeManager upgradeManager = new UpgradeManager(); - upgradeManager.clusterService = new UnifiedClusterServiceAdapter() { + upgradeManager.membershipService = new MembershipServiceAdapter() { + @Override + public MembershipGroup getLocalGroup() { + return getGroups() + .stream() + .filter(group -> group.version().equals(version)) + .findFirst() + .get(); + } + + @Override + public Collection getGroups() { + AtomicInteger nodeCounter = new AtomicInteger(); + Map> groups = Maps.newHashMap(); + versions.stream().forEach(version -> { + groups.computeIfAbsent(version, k -> Sets.newHashSet()) + .add(new Member(NodeId.nodeId(String.valueOf(nodeCounter.getAndIncrement())), version)); + }); + return Maps.transformEntries(groups, MembershipGroup::new).values(); + } + }; + + upgradeManager.clusterService = new ClusterServiceAdapter() { @Override public Set getNodes() { AtomicInteger nodeCounter = new AtomicInteger(); @@ -83,11 +112,6 @@ public class UpgradeManagerTest { .orElse(null); } - @Override - public ControllerNode.State getState(NodeId nodeId) { - return ControllerNode.State.READY; - } - @Override public Version getVersion(NodeId nodeId) { return versions.get(Integer.parseInt(nodeId.id())); diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java deleted file mode 100644 index aa961314e5..0000000000 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.cluster.messaging.impl; - -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Collectors; - -import com.google.common.base.Objects; -import com.google.common.base.Throwables; -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.onlab.util.Tools; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.UnifiedClusterService; -import org.onosproject.cluster.NodeId; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.ClusterMessage; -import org.onosproject.store.cluster.messaging.ClusterMessageHandler; -import org.onosproject.store.cluster.messaging.Endpoint; -import org.onosproject.store.cluster.messaging.MessageSubject; -import org.onosproject.store.cluster.messaging.MessagingService; -import org.onosproject.utils.MeteringAgent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.onosproject.security.AppGuard.checkPermission; -import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE; - -@Component(componentAbstract = true) -public abstract class AbstractClusterCommunicationManager - implements ClusterCommunicationService { - - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true); - private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true); - - private static final String PRIMITIVE_NAME = "clusterCommunication"; - private static final String SUBJECT_PREFIX = "subject"; - private static final String ENDPOINT_PREFIX = "endpoint"; - - private static final String SERIALIZING = "serialization"; - private static final String DESERIALIZING = "deserialization"; - private static final String NODE_PREFIX = "node:"; - private static final String ROUND_TRIP_SUFFIX = ".rtt"; - private static final String ONE_WAY_SUFFIX = ".oneway"; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected UnifiedClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MessagingService messagingService; - - private NodeId localNodeId; - - /** - * Returns the type for the given message subject. - * - * @param subject the type for the given message subject - * @return the message subject - */ - protected abstract String getType(MessageSubject subject); - - @Activate - public void activate() { - localNodeId = clusterService.getLocalNode().id(); - log.info("Started"); - } - - @Deactivate - public void deactivate() { - log.info("Stopped"); - } - - @Override - public void broadcast(M message, - MessageSubject subject, - Function encoder) { - checkPermission(CLUSTER_WRITE); - multicast(message, - subject, - encoder, - clusterService.getNodes() - .stream() - .filter(node -> !Objects.equal(node, clusterService.getLocalNode())) - .map(ControllerNode::id) - .collect(Collectors.toSet())); - } - - @Override - public void broadcastIncludeSelf(M message, - MessageSubject subject, - Function encoder) { - checkPermission(CLUSTER_WRITE); - multicast(message, - subject, - encoder, - clusterService.getNodes() - .stream() - .map(ControllerNode::id) - .collect(Collectors.toSet())); - } - - @Override - public CompletableFuture unicast(M message, - MessageSubject subject, - Function encoder, - NodeId toNodeId) { - checkPermission(CLUSTER_WRITE); - try { - byte[] payload = new ClusterMessage( - localNodeId, - subject, - timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message) - ).getBytes(); - return doUnicast(subject, payload, toNodeId); - } catch (Exception e) { - return Tools.exceptionalFuture(e); - } - } - - @Override - public void multicast(M message, - MessageSubject subject, - Function encoder, - Set nodes) { - checkPermission(CLUSTER_WRITE); - byte[] payload = new ClusterMessage( - localNodeId, - subject, - timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)) - .getBytes(); - nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId)); - } - - @Override - public CompletableFuture sendAndReceive(M message, - MessageSubject subject, - Function encoder, - Function decoder, - NodeId toNodeId) { - checkPermission(CLUSTER_WRITE); - try { - ClusterMessage envelope = new ClusterMessage( - clusterService.getLocalNode().id(), - subject, - timeFunction(encoder, subjectMeteringAgent, SERIALIZING). - apply(message)); - return sendAndReceive(subject, envelope.getBytes(), toNodeId). - thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes)); - } catch (Exception e) { - return Tools.exceptionalFuture(e); - } - } - - private CompletableFuture doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) { - ControllerNode node = clusterService.getNode(toNodeId); - checkArgument(node != null, "Unknown nodeId: %s", toNodeId); - Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort()); - MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX); - return messagingService.sendAsync(nodeEp, getType(subject), payload).whenComplete((r, e) -> context.stop(e)); - } - - private CompletableFuture sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) { - ControllerNode node = clusterService.getNode(toNodeId); - checkArgument(node != null, "Unknown nodeId: %s", toNodeId); - Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort()); - MeteringAgent.Context epContext = endpointMeteringAgent. - startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX); - MeteringAgent.Context subjectContext = subjectMeteringAgent. - startTimer(subject.toString() + ROUND_TRIP_SUFFIX); - return messagingService.sendAndReceive(nodeEp, getType(subject), payload). - whenComplete((bytes, throwable) -> { - subjectContext.stop(throwable); - epContext.stop(throwable); - }); - } - - @Override - public void addSubscriber(MessageSubject subject, - ClusterMessageHandler subscriber, - ExecutorService executor) { - checkPermission(CLUSTER_WRITE); - messagingService.registerHandler(getType(subject), - new InternalClusterMessageHandler(subscriber), - executor); - } - - @Override - public void removeSubscriber(MessageSubject subject) { - checkPermission(CLUSTER_WRITE); - messagingService.unregisterHandler(getType(subject)); - } - - @Override - public void addSubscriber(MessageSubject subject, - Function decoder, - Function handler, - Function encoder, - Executor executor) { - checkPermission(CLUSTER_WRITE); - messagingService.registerHandler(getType(subject), - new InternalMessageResponder(decoder, encoder, m -> { - CompletableFuture responseFuture = new CompletableFuture<>(); - executor.execute(() -> { - try { - responseFuture.complete(handler.apply(m)); - } catch (Exception e) { - responseFuture.completeExceptionally(e); - } - }); - return responseFuture; - })); - } - - @Override - public void addSubscriber(MessageSubject subject, - Function decoder, - Function> handler, - Function encoder) { - checkPermission(CLUSTER_WRITE); - messagingService.registerHandler(getType(subject), - new InternalMessageResponder<>(decoder, encoder, handler)); - } - - @Override - public void addSubscriber(MessageSubject subject, - Function decoder, - Consumer handler, - Executor executor) { - checkPermission(CLUSTER_WRITE); - messagingService.registerHandler(getType(subject), - new InternalMessageConsumer<>(decoder, handler), - executor); - } - - /** - * Performs the timed function, returning the value it would while timing the operation. - * - * @param timedFunction the function to be timed - * @param meter the metering agent to be used to time the function - * @param opName the opname to be used when starting the meter - * @param The param type of the function - * @param The return type of the function - * @return the value returned by the timed function - */ - private Function timeFunction(Function timedFunction, - MeteringAgent meter, String opName) { - checkNotNull(timedFunction); - checkNotNull(meter); - checkNotNull(opName); - return new Function() { - @Override - public B apply(A a) { - final MeteringAgent.Context context = meter.startTimer(opName); - B result = null; - try { - result = timedFunction.apply(a); - context.stop(null); - return result; - } catch (Exception e) { - context.stop(e); - Throwables.propagate(e); - return null; - } - } - }; - } - - - private class InternalClusterMessageHandler implements BiFunction { - private ClusterMessageHandler handler; - - public InternalClusterMessageHandler(ClusterMessageHandler handler) { - this.handler = handler; - } - - @Override - public byte[] apply(Endpoint sender, byte[] bytes) { - ClusterMessage message = ClusterMessage.fromBytes(bytes); - handler.handle(message); - return message.response(); - } - } - - private class InternalMessageResponder implements BiFunction> { - private final Function decoder; - private final Function encoder; - private final Function> handler; - - public InternalMessageResponder(Function decoder, - Function encoder, - Function> handler) { - this.decoder = decoder; - this.encoder = encoder; - this.handler = handler; - } - - @Override - public CompletableFuture apply(Endpoint sender, byte[] bytes) { - return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING). - apply(ClusterMessage.fromBytes(bytes).payload())). - thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m)); - } - } - - private class InternalMessageConsumer implements BiConsumer { - private final Function decoder; - private final Consumer consumer; - - public InternalMessageConsumer(Function decoder, Consumer consumer) { - this.decoder = decoder; - this.consumer = consumer; - } - - @Override - public void accept(Endpoint sender, byte[] bytes) { - consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING). - apply(ClusterMessage.fromBytes(bytes).payload())); - } - } -} diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java index 46027025a1..868006b4fa 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-present Open Networking Foundation + * Copyright 2017-present Open Networking Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,24 +15,326 @@ */ package org.onosproject.store.cluster.messaging.impl; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.base.Objects; +import com.google.common.base.Throwables; +import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; -import org.onosproject.core.VersionService; +import org.onlab.util.Tools; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.NodeId; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterMessage; +import org.onosproject.store.cluster.messaging.ClusterMessageHandler; +import org.onosproject.store.cluster.messaging.Endpoint; import org.onosproject.store.cluster.messaging.MessageSubject; +import org.onosproject.store.cluster.messaging.MessagingService; +import org.onosproject.utils.MeteringAgent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.onosproject.security.AppGuard.checkPermission; +import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE; @Component(immediate = true) @Service -public class ClusterCommunicationManager extends AbstractClusterCommunicationManager { +public class ClusterCommunicationManager implements ClusterCommunicationService { - private static final char VERSION_SEP = '-'; + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true); + private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true); + + private static final String PRIMITIVE_NAME = "clusterCommunication"; + private static final String SUBJECT_PREFIX = "subject"; + private static final String ENDPOINT_PREFIX = "endpoint"; + + private static final String SERIALIZING = "serialization"; + private static final String DESERIALIZING = "deserialization"; + private static final String NODE_PREFIX = "node:"; + private static final String ROUND_TRIP_SUFFIX = ".rtt"; + private static final String ONE_WAY_SUFFIX = ".oneway"; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - private VersionService versionService; + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MessagingService messagingService; + + private NodeId localNodeId; + + @Activate + public void activate() { + localNodeId = clusterService.getLocalNode().id(); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } @Override - protected String getType(MessageSubject subject) { - return subject.value() + VERSION_SEP + versionService.version().toString(); + public void broadcast(M message, + MessageSubject subject, + Function encoder) { + checkPermission(CLUSTER_WRITE); + multicast(message, + subject, + encoder, + clusterService.getNodes() + .stream() + .filter(node -> !Objects.equal(node, clusterService.getLocalNode())) + .map(ControllerNode::id) + .collect(Collectors.toSet())); + } + + @Override + public void broadcastIncludeSelf(M message, + MessageSubject subject, + Function encoder) { + checkPermission(CLUSTER_WRITE); + multicast(message, + subject, + encoder, + clusterService.getNodes() + .stream() + .map(ControllerNode::id) + .collect(Collectors.toSet())); + } + + @Override + public CompletableFuture unicast(M message, + MessageSubject subject, + Function encoder, + NodeId toNodeId) { + checkPermission(CLUSTER_WRITE); + try { + byte[] payload = new ClusterMessage( + localNodeId, + subject, + timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message) + ).getBytes(); + return doUnicast(subject, payload, toNodeId); + } catch (Exception e) { + return Tools.exceptionalFuture(e); + } + } + + @Override + public void multicast(M message, + MessageSubject subject, + Function encoder, + Set nodes) { + checkPermission(CLUSTER_WRITE); + byte[] payload = new ClusterMessage( + localNodeId, + subject, + timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)) + .getBytes(); + nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId)); + } + + @Override + public CompletableFuture sendAndReceive(M message, + MessageSubject subject, + Function encoder, + Function decoder, + NodeId toNodeId) { + checkPermission(CLUSTER_WRITE); + try { + ClusterMessage envelope = new ClusterMessage( + clusterService.getLocalNode().id(), + subject, + timeFunction(encoder, subjectMeteringAgent, SERIALIZING). + apply(message)); + return sendAndReceive(subject, envelope.getBytes(), toNodeId). + thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes)); + } catch (Exception e) { + return Tools.exceptionalFuture(e); + } + } + + private CompletableFuture doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) { + ControllerNode node = clusterService.getNode(toNodeId); + checkArgument(node != null, "Unknown nodeId: %s", toNodeId); + Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort()); + MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX); + return messagingService.sendAsync(nodeEp, subject.toString(), payload).whenComplete((r, e) -> context.stop(e)); + } + + private CompletableFuture sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) { + ControllerNode node = clusterService.getNode(toNodeId); + checkArgument(node != null, "Unknown nodeId: %s", toNodeId); + Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort()); + MeteringAgent.Context epContext = endpointMeteringAgent. + startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX); + MeteringAgent.Context subjectContext = subjectMeteringAgent. + startTimer(subject.toString() + ROUND_TRIP_SUFFIX); + return messagingService.sendAndReceive(nodeEp, subject.toString(), payload). + whenComplete((bytes, throwable) -> { + subjectContext.stop(throwable); + epContext.stop(throwable); + }); + } + + @Override + public void addSubscriber(MessageSubject subject, + ClusterMessageHandler subscriber, + ExecutorService executor) { + checkPermission(CLUSTER_WRITE); + messagingService.registerHandler(subject.toString(), + new InternalClusterMessageHandler(subscriber), + executor); + } + + @Override + public void removeSubscriber(MessageSubject subject) { + checkPermission(CLUSTER_WRITE); + messagingService.unregisterHandler(subject.toString()); + } + + @Override + public void addSubscriber(MessageSubject subject, + Function decoder, + Function handler, + Function encoder, + Executor executor) { + checkPermission(CLUSTER_WRITE); + messagingService.registerHandler(subject.toString(), + new InternalMessageResponder(decoder, encoder, m -> { + CompletableFuture responseFuture = new CompletableFuture<>(); + executor.execute(() -> { + try { + responseFuture.complete(handler.apply(m)); + } catch (Exception e) { + responseFuture.completeExceptionally(e); + } + }); + return responseFuture; + })); + } + + @Override + public void addSubscriber(MessageSubject subject, + Function decoder, + Function> handler, + Function encoder) { + checkPermission(CLUSTER_WRITE); + messagingService.registerHandler(subject.toString(), + new InternalMessageResponder<>(decoder, encoder, handler)); + } + + @Override + public void addSubscriber(MessageSubject subject, + Function decoder, + Consumer handler, + Executor executor) { + checkPermission(CLUSTER_WRITE); + messagingService.registerHandler(subject.toString(), + new InternalMessageConsumer<>(decoder, handler), + executor); + } + + /** + * Performs the timed function, returning the value it would while timing the operation. + * + * @param timedFunction the function to be timed + * @param meter the metering agent to be used to time the function + * @param opName the opname to be used when starting the meter + * @param The param type of the function + * @param The return type of the function + * @return the value returned by the timed function + */ + private Function timeFunction(Function timedFunction, + MeteringAgent meter, String opName) { + checkNotNull(timedFunction); + checkNotNull(meter); + checkNotNull(opName); + return new Function() { + @Override + public B apply(A a) { + final MeteringAgent.Context context = meter.startTimer(opName); + B result = null; + try { + result = timedFunction.apply(a); + context.stop(null); + return result; + } catch (Exception e) { + context.stop(e); + Throwables.propagate(e); + return null; + } + } + }; + } + + + private class InternalClusterMessageHandler implements BiFunction { + private ClusterMessageHandler handler; + + public InternalClusterMessageHandler(ClusterMessageHandler handler) { + this.handler = handler; + } + + @Override + public byte[] apply(Endpoint sender, byte[] bytes) { + ClusterMessage message = ClusterMessage.fromBytes(bytes); + handler.handle(message); + return message.response(); + } + } + + private class InternalMessageResponder implements BiFunction> { + private final Function decoder; + private final Function encoder; + private final Function> handler; + + public InternalMessageResponder(Function decoder, + Function encoder, + Function> handler) { + this.decoder = decoder; + this.encoder = encoder; + this.handler = handler; + } + + @Override + public CompletableFuture apply(Endpoint sender, byte[] bytes) { + return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING). + apply(ClusterMessage.fromBytes(bytes).payload())). + thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m)); + } + } + + private class InternalMessageConsumer implements BiConsumer { + private final Function decoder; + private final Consumer consumer; + + public InternalMessageConsumer(Function decoder, Consumer consumer) { + this.decoder = decoder; + this.consumer = consumer; + } + + @Override + public void accept(Endpoint sender, byte[] bytes) { + consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING). + apply(ClusterMessage.fromBytes(bytes).payload())); + } } } diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java deleted file mode 100644 index 45c84af372..0000000000 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.cluster.messaging.impl; - -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Service; -import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService; -import org.onosproject.store.cluster.messaging.MessageSubject; - -@Component(immediate = true) -@Service -public class UnifiedClusterCommunicationManager - extends AbstractClusterCommunicationManager - implements UnifiedClusterCommunicationService { - @Override - protected String getType(MessageSubject subject) { - return subject.value(); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java index 29045a276c..80753f366e 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java @@ -25,12 +25,12 @@ import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; -import org.onosproject.cluster.UnifiedClusterService; +import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.DefaultPartition; import org.onosproject.cluster.PartitionId; import org.onosproject.persistence.PersistenceService; -import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.primitives.DistributedPrimitiveCreator; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.service.AsyncAtomicValue; @@ -70,10 +70,10 @@ public class CoordinationManager implements CoordinationService { private final Logger log = getLogger(getClass()); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected UnifiedClusterService clusterService; + protected ClusterService clusterService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected UnifiedClusterCommunicationService clusterCommunicator; + protected ClusterCommunicationService clusterCommunicator; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected PersistenceService persistenceService; diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java index 1bfaaed955..9d6a016f74 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java @@ -15,21 +15,21 @@ */ package org.onosproject.store.primitives.impl; -import org.onlab.util.KryoNamespace; -import org.onosproject.cluster.MembershipService; -import org.onosproject.cluster.NodeId; -import org.onosproject.persistence.PersistenceService; -import org.onosproject.store.Timestamp; -import org.onosproject.store.cluster.messaging.ClusterCommunicator; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapBuilder; - import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; +import org.onlab.util.KryoNamespace; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.NodeId; +import org.onosproject.persistence.PersistenceService; +import org.onosproject.store.Timestamp; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.EventuallyConsistentMapBuilder; + import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -38,8 +38,8 @@ import static com.google.common.base.Preconditions.checkNotNull; */ public class EventuallyConsistentMapBuilderImpl implements EventuallyConsistentMapBuilder { - private final MembershipService clusterService; - private final ClusterCommunicator clusterCommunicator; + private final ClusterService clusterService; + private final ClusterCommunicationService clusterCommunicator; private String name; private KryoNamespace serializer; @@ -64,9 +64,10 @@ public class EventuallyConsistentMapBuilderImpl * @param clusterCommunicator cluster communication service * @param persistenceService persistence service */ - public EventuallyConsistentMapBuilderImpl(MembershipService clusterService, - ClusterCommunicator clusterCommunicator, - PersistenceService persistenceService) { + public EventuallyConsistentMapBuilderImpl( + ClusterService clusterService, + ClusterCommunicationService clusterCommunicator, + PersistenceService persistenceService) { this.persistenceService = persistenceService; this.clusterService = checkNotNull(clusterService); this.clusterCommunicator = checkNotNull(clusterCommunicator); diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java index 461245eebb..462d4ae4b2 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java @@ -15,34 +15,6 @@ */ package org.onosproject.store.primitives.impl; -import com.google.common.collect.Collections2; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.lang3.tuple.Pair; -import org.onlab.util.AbstractAccumulator; -import org.onlab.util.KryoNamespace; -import org.onlab.util.SlidingWindowCounter; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.MembershipService; -import org.onosproject.cluster.NodeId; -import org.onosproject.persistence.PersistenceService; -import org.onosproject.store.LogicalTimestamp; -import org.onosproject.store.Timestamp; -import org.onosproject.store.cluster.messaging.ClusterCommunicator; -import org.onosproject.store.cluster.messaging.MessageSubject; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.DistributedPrimitive; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapEvent; -import org.onosproject.store.service.EventuallyConsistentMapListener; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.WallClockTimestamp; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -67,6 +39,34 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.tuple.Pair; +import org.onlab.util.AbstractAccumulator; +import org.onlab.util.KryoNamespace; +import org.onlab.util.SlidingWindowCounter; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.NodeId; +import org.onosproject.persistence.PersistenceService; +import org.onosproject.store.LogicalTimestamp; +import org.onosproject.store.Timestamp; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.cluster.messaging.MessageSubject; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.DistributedPrimitive; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.EventuallyConsistentMapEvent; +import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.WallClockTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; @@ -86,8 +86,8 @@ public class EventuallyConsistentMapImpl private final Map> items; - private final MembershipService clusterService; - private final ClusterCommunicator clusterCommunicator; + private final ClusterService clusterService; + private final ClusterCommunicationService clusterCommunicator; private final Serializer serializer; private final NodeId localNodeId; private final PersistenceService persistenceService; @@ -162,8 +162,8 @@ public class EventuallyConsistentMapImpl * @param persistenceService persistence service */ EventuallyConsistentMapImpl(String mapName, - MembershipService clusterService, - ClusterCommunicator clusterCommunicator, + ClusterService clusterService, + ClusterCommunicationService clusterCommunicator, KryoNamespace ns, BiFunction timestampProvider, BiFunction> peerUpdateFunction, diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java index 4a926829ae..5bbd681cae 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java @@ -45,7 +45,7 @@ import org.onosproject.cluster.PartitionId; import org.onosproject.core.Version; import org.onosproject.core.VersionService; import org.onosproject.event.AbstractListenerManager; -import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.primitives.DistributedPrimitiveCreator; import org.onosproject.store.primitives.PartitionAdminService; import org.onosproject.store.primitives.PartitionEvent; @@ -71,7 +71,7 @@ public class PartitionManager extends AbstractListenerManager inactivePartitions.put(partition.getId(), new StoragePartition( - partition, - sourceVersion, - null, - clusterCommunicator, - clusterService, - new File(System.getProperty("karaf.data") + - "/partitions/" + sourceVersion + "/" + partition.getId())))); - currentClusterMetadata.get() - .getPartitions() - .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition( - partition, - targetVersion, - sourceVersion, - clusterCommunicator, - clusterService, - new File(System.getProperty("karaf.data") + - "/partitions/" + targetVersion + "/" + partition.getId())))); + .forEach(partition -> { + inactivePartitions.put(partition.getId(), new StoragePartition( + partition, + sourceVersion, + null, + clusterCommunicator, + clusterService, + new File(System.getProperty("karaf.data") + + "/partitions/" + sourceVersion + "/" + partition.getId()))); + activePartitions.put(partition.getId(), new StoragePartition( + partition, + targetVersion, + sourceVersion, + clusterCommunicator, + clusterService, + new File(System.getProperty("karaf.data") + + "/partitions/" + targetVersion + "/" + partition.getId()))); + }); // We have to fork existing partitions before we can start inactive partition servers to // avoid duplicate message handlers when both servers are running. diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java index 8bae2a37ef..ecba56dd94 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java @@ -40,7 +40,7 @@ import io.atomix.protocols.raft.protocol.RaftClientProtocol; import io.atomix.protocols.raft.protocol.ResetRequest; import io.atomix.protocols.raft.session.SessionId; import org.onosproject.cluster.NodeId; -import org.onosproject.store.cluster.messaging.ClusterCommunicator; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.service.Serializer; /** @@ -51,7 +51,7 @@ public class RaftClientCommunicator extends RaftCommunicator implements RaftClie public RaftClientCommunicator( String prefix, Serializer serializer, - ClusterCommunicator clusterCommunicator) { + ClusterCommunicationService clusterCommunicator) { super(new RaftMessageContext(prefix), serializer, clusterCommunicator); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java index 765eb02525..1117ab9ba2 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java @@ -22,7 +22,7 @@ import java.util.concurrent.CompletionException; import io.atomix.protocols.raft.RaftException; import io.atomix.protocols.raft.cluster.MemberId; import org.onosproject.cluster.NodeId; -import org.onosproject.store.cluster.messaging.ClusterCommunicator; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.cluster.messaging.MessagingException; import org.onosproject.store.service.Serializer; @@ -35,12 +35,12 @@ import static com.google.common.base.Preconditions.checkNotNull; public abstract class RaftCommunicator { protected final RaftMessageContext context; protected final Serializer serializer; - protected final ClusterCommunicator clusterCommunicator; + protected final ClusterCommunicationService clusterCommunicator; public RaftCommunicator( RaftMessageContext context, Serializer serializer, - ClusterCommunicator clusterCommunicator) { + ClusterCommunicationService clusterCommunicator) { this.context = checkNotNull(context, "context cannot be null"); this.serializer = checkNotNull(serializer, "serializer cannot be null"); this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null"); diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java index 2710a2caed..097ee469c6 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java @@ -57,7 +57,6 @@ import io.atomix.protocols.raft.protocol.VoteResponse; import io.atomix.protocols.raft.session.SessionId; import org.onosproject.cluster.NodeId; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.ClusterCommunicator; import org.onosproject.store.service.Serializer; /** @@ -68,7 +67,7 @@ public class RaftServerCommunicator extends RaftCommunicator implements RaftServ public RaftServerCommunicator( String prefix, Serializer serializer, - ClusterCommunicator clusterCommunicator) { + ClusterCommunicationService clusterCommunicator) { super(new RaftMessageContext(prefix), serializer, clusterCommunicator); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java index 59c4b17731..b73dd61051 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java @@ -29,10 +29,10 @@ import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; +import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.PartitionId; -import org.onosproject.cluster.UnifiedClusterService; import org.onosproject.persistence.PersistenceService; -import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.primitives.DistributedPrimitiveCreator; import org.onosproject.store.primitives.PartitionAdminService; import org.onosproject.store.primitives.PartitionService; @@ -81,10 +81,10 @@ public class StorageManager implements StorageService, StorageAdminService { private final Logger log = getLogger(getClass()); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected UnifiedClusterService clusterService; + protected ClusterService clusterService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected UnifiedClusterCommunicationService clusterCommunicator; + protected ClusterCommunicationService clusterCommunicator; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected PersistenceService persistenceService; diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java index 414c49c388..5458edd9cb 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java @@ -29,12 +29,12 @@ import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; import io.atomix.protocols.raft.cluster.MemberId; import io.atomix.protocols.raft.service.RaftService; -import org.onosproject.cluster.MembershipService; +import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.NodeId; import org.onosproject.cluster.Partition; import org.onosproject.cluster.PartitionId; import org.onosproject.core.Version; -import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService; import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService; import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService; @@ -54,8 +54,8 @@ import org.onosproject.store.service.Serializer; public class StoragePartition implements Managed { private final AtomicBoolean isOpened = new AtomicBoolean(false); - private final UnifiedClusterCommunicationService clusterCommunicator; - private final MembershipService clusterService; + private final ClusterCommunicationService clusterCommunicator; + private final ClusterService clusterService; private final Version version; private final Version source; private final File dataFolder; @@ -85,8 +85,8 @@ public class StoragePartition implements Managed { Partition partition, Version version, Version source, - UnifiedClusterCommunicationService clusterCommunicator, - MembershipService clusterService, + ClusterCommunicationService clusterCommunicator, + ClusterService clusterService, File dataFolder) { this.partition = partition; this.version = version; @@ -191,6 +191,10 @@ public class StoragePartition implements Managed { return source != null ? clusterService.getNodes() .stream() + .filter(node -> { + Version nodeVersion = clusterService.getVersion(node.id()); + return nodeVersion != null && nodeVersion.equals(version); + }) .map(node -> MemberId.from(node.id().id())) .collect(Collectors.toList()) : Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id())); @@ -202,6 +206,10 @@ public class StoragePartition implements Managed { } else { return clusterService.getNodes() .stream() + .filter(node -> { + Version nodeVersion = clusterService.getVersion(node.id()); + return nodeVersion != null && nodeVersion.equals(version); + }) .map(node -> MemberId.from(node.id().id())) .collect(Collectors.toList()); } @@ -231,13 +239,10 @@ public class StoragePartition implements Managed { clusterCommunicator); CompletableFuture future; - if (clusterService.getNodes().size() == 1) { + if (getMemberIds().size() == 1) { future = server.fork(version); } else { - future = server.join(clusterService.getNodes().stream() - .filter(node -> !node.id().equals(localNodeId)) - .map(node -> MemberId.from(node.id().id())) - .collect(Collectors.toList())); + future = server.join(getMemberIds()); } return future.thenRun(() -> this.server = server); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java index 123c3b6a40..3a15fceccb 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java @@ -28,7 +28,7 @@ import io.atomix.protocols.raft.cluster.RaftMember; import io.atomix.protocols.raft.storage.RaftStorage; import io.atomix.storage.StorageLevel; import org.onosproject.core.Version; -import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter; import org.onosproject.store.service.PartitionInfo; import org.onosproject.store.service.Serializer; @@ -49,13 +49,13 @@ public class StoragePartitionServer implements Managed { private final MemberId localMemberId; private final StoragePartition partition; - private final UnifiedClusterCommunicationService clusterCommunicator; + private final ClusterCommunicationService clusterCommunicator; private RaftServer server; public StoragePartitionServer( StoragePartition partition, MemberId localMemberId, - UnifiedClusterCommunicationService clusterCommunicator) { + ClusterCommunicationService clusterCommunicator) { this.partition = partition; this.localMemberId = localMemberId; this.clusterCommunicator = clusterCommunicator;