Replace Unified* services with MembershipService for subgroup membership

Change-Id: Iabff173ce3501d1ed300513cac445bb712614bd9
This commit is contained in:
Jordan Halterman 2017-10-17 17:29:10 -07:00
parent a0b8f65e8a
commit 28183eea1d
32 changed files with 1141 additions and 1200 deletions

View File

@ -22,8 +22,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.karaf.shell.commands.Command;
import org.joda.time.DateTime;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.MembershipAdminService;
import org.onosproject.core.Version;
import org.onosproject.utils.Comparators;
@ -32,7 +32,6 @@ import java.util.List;
import static com.google.common.collect.Lists.newArrayList;
/**
* Lists all controller cluster nodes.
*/
@ -44,7 +43,7 @@ public class NodesListCommand extends AbstractShellCommand {
@Override
protected void execute() {
MembershipAdminService service = get(MembershipAdminService.class);
ClusterAdminService service = get(ClusterAdminService.class);
List<ControllerNode> nodes = newArrayList(service.getNodes());
Collections.sort(nodes, Comparators.NODE_COMPARATOR);
if (outputJson()) {
@ -68,7 +67,7 @@ public class NodesListCommand extends AbstractShellCommand {
}
// Produces JSON structure.
private JsonNode json(MembershipAdminService service, List<ControllerNode> nodes) {
private JsonNode json(ClusterAdminService service, List<ControllerNode> nodes) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode result = mapper.createArrayNode();
ControllerNode self = service.getLocalNode();

View File

@ -15,14 +15,56 @@
*/
package org.onosproject.cluster;
import java.util.Set;
import org.onlab.packet.IpAddress;
/**
* Service for administering the cluster node membership.
* <p>
* This service has a view of the cluster membership that is isolated to the local node's version during upgrades.
* For an equivalent service that has control over all nodes during an upgrade use
* {@link UnifiedClusterAdminService}.
*
* @see UnifiedClusterAdminService
*/
public interface ClusterAdminService extends MembershipAdminService {
public interface ClusterAdminService extends ClusterService {
/**
* Forms cluster configuration based on the specified set of node
* information.&nbsp; This method resets and restarts the controller
* instance.
*
* @param nodes set of nodes that form the cluster
*/
void formCluster(Set<ControllerNode> nodes);
/**
* Forms cluster configuration based on the specified set of node
* information.&nbsp; This method resets and restarts the controller
* instance.
*
* @param nodes set of nodes that form the cluster
* @param partitionSize number of nodes to compose a partition
*/
void formCluster(Set<ControllerNode> nodes, int partitionSize);
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
* @param ip node IP listen address
* @param tcpPort tcp listen port
* @return newly added node
*/
ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort);
/**
* Removes the specified node from the cluster node list.
*
* @param nodeId controller node identifier
*/
void removeNode(NodeId nodeId);
/**
* Marks the current node as fully started or not.
*
* @param started true indicates all components have been started
*/
void markFullyStarted(boolean started);
}

View File

@ -15,13 +15,63 @@
*/
package org.onosproject.cluster;
import java.util.Set;
import org.joda.time.DateTime;
import org.onosproject.core.Version;
import org.onosproject.event.ListenerService;
/**
* Service for obtaining information about the individual nodes within the controller cluster.
* <p>
* This service's view of the nodes in the cluster is isolated to a single version of the software. During upgrades,
* when multiple versions of the software are running in the same cluster, users of this service will only be able
* to see nodes running the same version as the local node. This is useful for limiting communication to nodes running
* the same version of the software.
*/
public interface ClusterService extends MembershipService {
public interface ClusterService extends ListenerService<ClusterEvent, ClusterEventListener> {
/**
* Returns the local controller node.
*
* @return local controller node
*/
ControllerNode getLocalNode();
/**
* Returns the set of current cluster members.
*
* @return set of cluster members
*/
Set<ControllerNode> getNodes();
/**
* Returns the specified controller node.
*
* @param nodeId controller node identifier
* @return controller node
*/
ControllerNode getNode(NodeId nodeId);
/**
* Returns the availability state of the specified controller node. Note
* that this does not imply that all the core and application components
* have been fully activated; only that the node has joined the cluster.
*
* @param nodeId controller node identifier
* @return availability state
*/
ControllerNode.State getState(NodeId nodeId);
/**
* Returns the version of the given controller node.
*
* @param nodeId controller node identifier
* @return controller version
*/
Version getVersion(NodeId nodeId);
/**
* Returns the system time when the availability state was last updated.
*
* @param nodeId controller node identifier
* @return system time when the availability state was last updated.
*/
DateTime getLastUpdated(NodeId nodeId);
}

View File

@ -74,7 +74,6 @@ public interface ControllerNode {
*/
IpAddress ip();
/**
* Returns the TCP port on which the node listens for connections.
*

View File

@ -0,0 +1,83 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import java.util.Objects;
import org.onosproject.core.Version;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Controller member identity.
*/
public final class Member {
private final NodeId nodeId;
private final Version version;
/**
* Creates a new cluster member identifier from the specified string.
*
* @param nodeId node identifier
* @param version node version
*/
public Member(NodeId nodeId, Version version) {
this.nodeId = checkNotNull(nodeId);
this.version = version;
}
/**
* Returns the node identifier.
*
* @return the node identifier
*/
public NodeId nodeId() {
return nodeId;
}
/**
* Returns the node version.
*
* @return the node version
*/
public Version version() {
return version;
}
@Override
public int hashCode() {
return Objects.hash(nodeId, version);
}
@Override
public boolean equals(Object object) {
if (object instanceof Member) {
Member member = (Member) object;
return member.nodeId.equals(nodeId) && Objects.equals(member.version, version);
}
return false;
}
@Override
public String toString() {
return toStringHelper(this)
.add("nodeId", nodeId)
.add("version", version)
.toString();
}
}

View File

@ -1,70 +0,0 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import java.util.Set;
import org.onlab.packet.IpAddress;
/**
* Service for administering the cluster node membership.
*/
public interface MembershipAdminService extends MembershipService {
/**
* Forms cluster configuration based on the specified set of node
* information.&nbsp; This method resets and restarts the controller
* instance.
*
* @param nodes set of nodes that form the cluster
*/
void formCluster(Set<ControllerNode> nodes);
/**
* Forms cluster configuration based on the specified set of node
* information.&nbsp; This method resets and restarts the controller
* instance.
*
* @param nodes set of nodes that form the cluster
* @param partitionSize number of nodes to compose a partition
*/
void formCluster(Set<ControllerNode> nodes, int partitionSize);
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
* @param ip node IP listen address
* @param tcpPort tcp listen port
* @return newly added node
*/
ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort);
/**
* Removes the specified node from the cluster node list.
*
* @param nodeId controller node identifier
*/
void removeNode(NodeId nodeId);
/**
* Marks the current node as fully started or not.
*
* @param started true indicates all components have been started
*/
void markFullyStarted(boolean started);
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import java.util.Set;
import org.onosproject.core.Version;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Membership group.
*/
public class MembershipGroup {
private final Version version;
private final Set<Member> members;
public MembershipGroup(Version version, Set<Member> members) {
this.version = version;
this.members = members;
}
/**
* Returns the group version.
*
* @return the group version
*/
public Version version() {
return version;
}
/**
* Returns the set of members in the group.
*
* @return the set of members in the group
*/
public Set<Member> members() {
return members;
}
@Override
public String toString() {
return toStringHelper(this)
.add("version", version)
.add("members", members)
.toString();
}
}

View File

@ -15,32 +15,59 @@
*/
package org.onosproject.cluster;
import java.util.Collection;
import java.util.Set;
import org.joda.time.DateTime;
import org.onosproject.core.Version;
import org.onosproject.event.ListenerService;
/**
* Service for obtaining information about the individual nodes within
* the controller cluster.
* Service for obtaining information about the individual members of the controller cluster.
*/
public interface MembershipService
extends ListenerService<ClusterEvent, ClusterEventListener> {
public interface MembershipService {
/**
* Returns the local controller node.
* Returns the local member.
*
* @return local controller node
* @return local member
*/
ControllerNode getLocalNode();
Member getLocalMember();
/**
* Returns the set of current cluster members.
* Returns the group associated with the local member.
*
* @return set of cluster members
* @return the group associated with the local member
*/
Set<ControllerNode> getNodes();
MembershipGroup getLocalGroup();
/**
* Returns the set of current cluster members in the local group.
*
* @return set of cluster members in the local group
*/
Set<Member> getMembers();
/**
* Returns the set of membership groups in the cluster.
*
* @return the set of membership groups in the cluster
*/
Collection<MembershipGroup> getGroups();
/**
* Returns the membership group for the given version.
*
* @param version the version for which to return the membership group
* @return the membership group for the given version
*/
MembershipGroup getGroup(Version version);
/**
* Returns the set of members in the given version.
*
* @param version the version for which to return the set of members
* @return the set of members for the given version
*/
Set<Member> getMembers(Version version);
/**
* Returns the specified controller node.
@ -48,32 +75,6 @@ public interface MembershipService
* @param nodeId controller node identifier
* @return controller node
*/
ControllerNode getNode(NodeId nodeId);
/**
* Returns the availability state of the specified controller node. Note
* that this does not imply that all the core and application components
* have been fully activated; only that the node has joined the cluster.
*
* @param nodeId controller node identifier
* @return availability state
*/
ControllerNode.State getState(NodeId nodeId);
/**
* Returns the version of the given controller node.
*
* @param nodeId controller node identifier
* @return controller version
*/
Version getVersion(NodeId nodeId);
/**
* Returns the system time when the availability state was last updated.
*
* @param nodeId controller node identifier
* @return system time when the availability state was last updated.
*/
DateTime getLastUpdated(NodeId nodeId);
Member getMember(NodeId nodeId);
}

View File

@ -1,25 +0,0 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import com.google.common.annotations.Beta;
/**
* Cluster membership administration service that supports modification of all nodes during an upgrade.
*/
@Beta
public interface UnifiedClusterAdminService extends MembershipAdminService {
}

View File

@ -1,32 +0,0 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import com.google.common.annotations.Beta;
/**
* Unified multi-version cluster membership service.
* <p>
* During upgrades, the nodes within a cluster may be running multiple versions of the software.
* This service has a view of the entire cluster running any version. Users of this service must be careful when
* communicating with nodes described by this service as compatibility issues can result from communicating across
* versions. For an equivalent service that has an isolated view of the cluster, see {@link ClusterService}.
*
* @see ClusterService
*/
@Beta
public interface UnifiedClusterService extends MembershipService {
}

View File

@ -15,15 +15,152 @@
*/
package org.onosproject.store.cluster.messaging;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import org.onosproject.cluster.NodeId;
/**
* Service for assisting communications between controller cluster nodes.
* <p>
* Communication via this service is isolated to nodes running a single version of the software. During upgrades, when
* nodes may be running multiple versions simultaneously, this service prevents nodes running different versions of
* the software from communicating with each other, thus avoiding compatibility issues. For an equivalent cross-version
* compatible service, see {@link UnifiedClusterCommunicationService}.
*
* @see UnifiedClusterCommunicationService
*/
public interface ClusterCommunicationService extends ClusterCommunicator {
public interface ClusterCommunicationService {
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param subscriber message subscriber
* @param executor executor to use for running handler.
* @deprecated in Cardinal Release
*/
@Deprecated
void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
/**
* Broadcasts a message to all controller nodes.
*
* @param message message to send
* @param subject message subject
* @param encoder function for encoding message to byte[]
* @param <M> message type
*/
<M> void broadcast(M message,
MessageSubject subject,
Function<M, byte[]> encoder);
/**
* Broadcasts a message to all controller nodes including self.
*
* @param message message to send
* @param subject message subject
* @param encoder function for encoding message to byte[]
* @param <M> message type
*/
<M> void broadcastIncludeSelf(M message,
MessageSubject subject,
Function<M, byte[]> encoder);
/**
* Sends a message to the specified controller node.
*
* @param message message to send
* @param subject message subject
* @param encoder function for encoding message to byte[]
* @param toNodeId destination node identifier
* @param <M> message type
* @return future that is completed when the message is sent
*/
<M> CompletableFuture<Void> unicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
NodeId toNodeId);
/**
* Multicasts a message to a set of controller nodes.
*
* @param message message to send
* @param subject message subject
* @param encoder function for encoding message to byte[]
* @param nodeIds recipient node identifiers
* @param <M> message type
*/
<M> void multicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
Set<NodeId> nodeIds);
/**
* Sends a message and expects a reply.
*
* @param message message to send
* @param subject message subject
* @param encoder function for encoding request to byte[]
* @param decoder function for decoding response from byte[]
* @param toNodeId recipient node identifier
* @param <M> request type
* @param <R> reply type
* @return reply future
*/
<M, R> CompletableFuture<R> sendAndReceive(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
NodeId toNodeId);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param decoder decoder for resurrecting incoming message
* @param handler handler function that processes the incoming message and produces a reply
* @param encoder encoder for serializing reply
* @param executor executor to run this handler on
* @param <M> incoming message type
* @param <R> reply message type
*/
<M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Function<M, R> handler,
Function<R, byte[]> encoder,
Executor executor);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param decoder decoder for resurrecting incoming message
* @param handler handler function that processes the incoming message and produces a reply
* @param encoder encoder for serializing reply
* @param <M> incoming message type
* @param <R> reply message type
*/
<M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Function<M, CompletableFuture<R>> handler,
Function<R, byte[]> encoder);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param decoder decoder to resurrecting incoming message
* @param handler handler for handling message
* @param executor executor to run this handler on
* @param <M> incoming message type
*/
<M> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Consumer<M> handler,
Executor executor);
/**
* Removes a subscriber for the specified message subject.
*
* @param subject message subject
*/
void removeSubscriber(MessageSubject subject);
}

View File

@ -1,166 +0,0 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.messaging;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import org.onosproject.cluster.NodeId;
/**
* Service for assisting communications between controller cluster nodes.
*/
public interface ClusterCommunicator {
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param subscriber message subscriber
* @param executor executor to use for running handler.
* @deprecated in Cardinal Release
*/
@Deprecated
void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
/**
* Broadcasts a message to all controller nodes.
*
* @param message message to send
* @param subject message subject
* @param encoder function for encoding message to byte[]
* @param <M> message type
*/
<M> void broadcast(M message,
MessageSubject subject,
Function<M, byte[]> encoder);
/**
* Broadcasts a message to all controller nodes including self.
*
* @param message message to send
* @param subject message subject
* @param encoder function for encoding message to byte[]
* @param <M> message type
*/
<M> void broadcastIncludeSelf(M message,
MessageSubject subject,
Function<M, byte[]> encoder);
/**
* Sends a message to the specified controller node.
*
* @param message message to send
* @param subject message subject
* @param encoder function for encoding message to byte[]
* @param toNodeId destination node identifier
* @param <M> message type
* @return future that is completed when the message is sent
*/
<M> CompletableFuture<Void> unicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
NodeId toNodeId);
/**
* Multicasts a message to a set of controller nodes.
*
* @param message message to send
* @param subject message subject
* @param encoder function for encoding message to byte[]
* @param nodeIds recipient node identifiers
* @param <M> message type
*/
<M> void multicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
Set<NodeId> nodeIds);
/**
* Sends a message and expects a reply.
*
* @param message message to send
* @param subject message subject
* @param encoder function for encoding request to byte[]
* @param decoder function for decoding response from byte[]
* @param toNodeId recipient node identifier
* @param <M> request type
* @param <R> reply type
* @return reply future
*/
<M, R> CompletableFuture<R> sendAndReceive(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
NodeId toNodeId);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param decoder decoder for resurrecting incoming message
* @param handler handler function that processes the incoming message and produces a reply
* @param encoder encoder for serializing reply
* @param executor executor to run this handler on
* @param <M> incoming message type
* @param <R> reply message type
*/
<M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Function<M, R> handler,
Function<R, byte[]> encoder,
Executor executor);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param decoder decoder for resurrecting incoming message
* @param handler handler function that processes the incoming message and produces a reply
* @param encoder encoder for serializing reply
* @param <M> incoming message type
* @param <R> reply message type
*/
<M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Function<M, CompletableFuture<R>> handler,
Function<R, byte[]> encoder);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param decoder decoder to resurrecting incoming message
* @param handler handler for handling message
* @param executor executor to run this handler on
* @param <M> incoming message type
*/
<M> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Consumer<M> handler,
Executor executor);
/**
* Removes a subscriber for the specified message subject.
*
* @param subject message subject
*/
void removeSubscriber(MessageSubject subject);
}

View File

@ -1,31 +0,0 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.messaging;
import com.google.common.annotations.Beta;
/**
* Service for unified communication across controller nodes running multiple software versions.
* <p>
* This service supports communicating across nodes running different versions of the software simultaneously. During
* upgrades, when nodes may be running a mixture of versions, this service can be used to coordinate across those
* versions. But users of this service must be extremely careful to preserve backward/forward compatibility for
* messages sent across versions. Encoders and decoders used for messages sent/received on this service should
* support evolving schemas.
*/
@Beta
public interface UnifiedClusterCommunicationService extends ClusterCommunicator {
}

View File

@ -15,52 +15,47 @@
*/
package org.onosproject.cluster;
import java.util.Collection;
import java.util.Set;
import org.joda.time.DateTime;
import org.onosproject.core.Version;
/**
* Compatible cluster service adapter.
* Membership service adapter.
*/
public class UnifiedClusterServiceAdapter implements UnifiedClusterService {
public class MembershipServiceAdapter implements MembershipService {
@Override
public ControllerNode getLocalNode() {
public Member getLocalMember() {
return null;
}
@Override
public Set<ControllerNode> getNodes() {
public MembershipGroup getLocalGroup() {
return null;
}
@Override
public ControllerNode getNode(NodeId nodeId) {
public Set<Member> getMembers() {
return null;
}
@Override
public ControllerNode.State getState(NodeId nodeId) {
public Collection<MembershipGroup> getGroups() {
return null;
}
@Override
public Version getVersion(NodeId nodeId) {
public MembershipGroup getGroup(Version version) {
return null;
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
public Set<Member> getMembers(Version version) {
return null;
}
@Override
public void addListener(ClusterEventListener listener) {
}
@Override
public void removeListener(ClusterEventListener listener) {
public Member getMember(NodeId nodeId) {
return null;
}
}

View File

@ -15,28 +15,48 @@
*/
package org.onosproject.cluster.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.apache.karaf.system.SystemService;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataAdminService;
import org.onosproject.cluster.ClusterMetadataDiff;
import org.onosproject.cluster.ClusterMetadataEvent;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.UnifiedClusterAdminService;
import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.CLUSTER_READ;
import static org.slf4j.LoggerFactory.getLogger;
@ -46,122 +66,172 @@ import static org.slf4j.LoggerFactory.getLogger;
*/
@Component(immediate = true)
@Service
public class ClusterManager implements ClusterService, ClusterAdminService {
public class ClusterManager
extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
implements ClusterService, ClusterAdminService {
public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
private static final int DEFAULT_PARTITION_SIZE = 3;
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private UnifiedClusterService clusterService;
private ClusterStoreDelegate delegate = new InternalStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private UnifiedClusterAdminService clusterAdminService;
protected ClusterMetadataService clusterMetadataService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private VersionService versionService;
protected ClusterMetadataAdminService clusterMetadataAdminService;
private Version version;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterStore store;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SystemService systemService;
private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
@Activate
public void activate() {
version = versionService.version();
store.setDelegate(delegate);
eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
clusterMetadataService.addListener(metadataListener);
processMetadata(clusterMetadataService.getClusterMetadata());
log.info("Started");
}
@Deactivate
public void deactivate() {
clusterMetadataService.removeListener(metadataListener);
store.unsetDelegate(delegate);
eventDispatcher.removeSink(ClusterEvent.class);
log.info("Stopped");
}
@Override
public ControllerNode getLocalNode() {
checkPermission(CLUSTER_READ);
return clusterService.getLocalNode();
return store.getLocalNode();
}
@Override
public Set<ControllerNode> getNodes() {
checkPermission(CLUSTER_READ);
return clusterService.getNodes()
.stream()
.filter(node -> clusterService.getVersion(node.id()).equals(version))
.collect(Collectors.toSet());
return store.getNodes();
}
@Override
public ControllerNode getNode(NodeId nodeId) {
checkPermission(CLUSTER_READ);
Version nodeVersion = clusterService.getVersion(nodeId);
if (nodeVersion != null && nodeVersion.equals(version)) {
return clusterService.getNode(nodeId);
}
return null;
checkNotNull(nodeId, INSTANCE_ID_NULL);
return store.getNode(nodeId);
}
@Override
public ControllerNode.State getState(NodeId nodeId) {
checkPermission(CLUSTER_READ);
Version nodeVersion = clusterService.getVersion(nodeId);
if (nodeVersion != null && nodeVersion.equals(version)) {
return clusterService.getState(nodeId);
}
return null;
checkNotNull(nodeId, INSTANCE_ID_NULL);
return store.getState(nodeId);
}
@Override
public Version getVersion(NodeId nodeId) {
checkPermission(CLUSTER_READ);
Version nodeVersion = clusterService.getVersion(nodeId);
if (nodeVersion != null && nodeVersion.equals(version)) {
return nodeVersion;
}
return null;
checkNotNull(nodeId, INSTANCE_ID_NULL);
return store.getVersion(nodeId);
}
@Override
public void markFullyStarted(boolean started) {
clusterAdminService.markFullyStarted(started);
store.markFullyStarted(started);
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
checkPermission(CLUSTER_READ);
Version nodeVersion = clusterService.getVersion(nodeId);
if (nodeVersion != null && nodeVersion.equals(version)) {
return clusterService.getLastUpdated(nodeId);
}
return null;
return store.getLastUpdated(nodeId);
}
@Override
public void formCluster(Set<ControllerNode> nodes) {
clusterAdminService.formCluster(nodes);
formCluster(nodes, DEFAULT_PARTITION_SIZE);
}
@Override
public void formCluster(Set<ControllerNode> nodes, int partitionSize) {
clusterAdminService.formCluster(nodes, partitionSize);
}
checkNotNull(nodes, "Nodes cannot be null");
checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return clusterAdminService.addNode(nodeId, ip, tcpPort);
}
@Override
public void removeNode(NodeId nodeId) {
Version nodeVersion = clusterService.getVersion(nodeId);
if (nodeVersion != null && nodeVersion.equals(version)) {
clusterAdminService.removeNode(nodeId);
ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize));
clusterMetadataAdminService.setClusterMetadata(metadata);
try {
log.warn("Shutting down container for cluster reconfiguration!");
// Clean up persistent state associated with previous cluster configuration.
Tools.removeDirectory(System.getProperty("karaf.data") + "/partitions");
systemService.reboot("now", SystemService.Swipe.NONE);
} catch (Exception e) {
log.error("Unable to reboot container", e);
}
}
@Override
public void addListener(ClusterEventListener listener) {
clusterService.addListener(listener);
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
checkNotNull(ip, "IP address cannot be null");
checkArgument(tcpPort > 5000, "TCP port must be > 5000");
return store.addNode(nodeId, ip, tcpPort);
}
@Override
public void removeListener(ClusterEventListener listener) {
clusterService.removeListener(listener);
public void removeNode(NodeId nodeId) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
store.removeNode(nodeId);
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements ClusterStoreDelegate {
@Override
public void notify(ClusterEvent event) {
post(event);
}
}
private static Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
List<ControllerNode> sorted = new ArrayList<>(nodes);
Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
Set<Partition> partitions = Sets.newHashSet();
// add partitions
int length = nodes.size();
int count = Math.min(partitionSize, length);
for (int i = 0; i < length; i++) {
int index = i;
Set<NodeId> set = new HashSet<>(count);
for (int j = 0; j < count; j++) {
set.add(sorted.get((i + j) % length).id());
}
partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set));
}
return partitions;
}
/**
* Processes metadata by adding and removing nodes from the cluster.
*/
private synchronized void processMetadata(ClusterMetadata metadata) {
try {
ClusterMetadataDiff examiner =
new ClusterMetadataDiff(currentMetadata.get(), metadata);
examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
examiner.nodesRemoved().forEach(this::removeNode);
} finally {
currentMetadata.set(metadata);
}
}
private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
@Override
public void event(ClusterMetadataEvent event) {
processMetadata(event.subject());
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster.impl;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Member;
import org.onosproject.cluster.MembershipGroup;
import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.Version;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Cluster membership manager.
*/
@Component(immediate = true)
@Service
public class MembershipManager implements MembershipService {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private Member localMember;
@Activate
public void activate() {
localMember = new Member(
clusterService.getLocalNode().id(),
clusterService.getVersion(clusterService.getLocalNode().id()));
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
private Member toMemberId(ControllerNode node) {
return new Member(node.id(), clusterService.getVersion(node.id()));
}
@Override
public Member getLocalMember() {
return localMember;
}
@Override
public MembershipGroup getLocalGroup() {
return getGroup(getLocalMember().version());
}
@Override
public Set<Member> getMembers() {
return clusterService.getNodes().stream()
.map(this::toMemberId)
.collect(Collectors.toSet());
}
@Override
public Collection<MembershipGroup> getGroups() {
Map<Version, Set<Member>> groups = Maps.newHashMap();
clusterService.getNodes().stream()
.map(this::toMemberId)
.forEach(member ->
groups.computeIfAbsent(member.version(), k -> Sets.newHashSet()).add(member));
return Maps.transformEntries(groups, MembershipGroup::new).values();
}
@Override
public MembershipGroup getGroup(Version version) {
return new MembershipGroup(version, getMembers(version));
}
@Override
public Set<Member> getMembers(Version version) {
return getMembers()
.stream()
.filter(m -> Objects.equals(m.version(), version))
.collect(Collectors.toSet());
}
@Override
public Member getMember(NodeId nodeId) {
ControllerNode node = clusterService.getNode(nodeId);
return node != null ? toMemberId(node) : null;
}
}

View File

@ -1,237 +0,0 @@
/*
* Copyright 2014-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.apache.karaf.system.SystemService;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataAdminService;
import org.onosproject.cluster.ClusterMetadataDiff;
import org.onosproject.cluster.ClusterMetadataEvent;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.UnifiedClusterAdminService;
import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
import org.onosproject.event.AbstractListenerManager;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.CLUSTER_READ;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of the cluster service.
*/
@Component(immediate = true)
@Service
public class UnifiedClusterManager
extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
implements UnifiedClusterService, UnifiedClusterAdminService {
public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
private static final int DEFAULT_PARTITION_SIZE = 3;
private final Logger log = getLogger(getClass());
private ClusterStoreDelegate delegate = new InternalStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataService clusterMetadataService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataAdminService clusterMetadataAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterStore store;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SystemService systemService;
private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
@Activate
public void activate() {
store.setDelegate(delegate);
eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
clusterMetadataService.addListener(metadataListener);
processMetadata(clusterMetadataService.getClusterMetadata());
log.info("Started");
}
@Deactivate
public void deactivate() {
clusterMetadataService.removeListener(metadataListener);
store.unsetDelegate(delegate);
eventDispatcher.removeSink(ClusterEvent.class);
log.info("Stopped");
}
@Override
public ControllerNode getLocalNode() {
checkPermission(CLUSTER_READ);
return store.getLocalNode();
}
@Override
public Set<ControllerNode> getNodes() {
checkPermission(CLUSTER_READ);
return store.getNodes();
}
@Override
public ControllerNode getNode(NodeId nodeId) {
checkPermission(CLUSTER_READ);
checkNotNull(nodeId, INSTANCE_ID_NULL);
return store.getNode(nodeId);
}
@Override
public ControllerNode.State getState(NodeId nodeId) {
checkPermission(CLUSTER_READ);
checkNotNull(nodeId, INSTANCE_ID_NULL);
return store.getState(nodeId);
}
@Override
public Version getVersion(NodeId nodeId) {
checkPermission(CLUSTER_READ);
checkNotNull(nodeId, INSTANCE_ID_NULL);
return store.getVersion(nodeId);
}
@Override
public void markFullyStarted(boolean started) {
store.markFullyStarted(started);
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
checkPermission(CLUSTER_READ);
return store.getLastUpdated(nodeId);
}
@Override
public void formCluster(Set<ControllerNode> nodes) {
formCluster(nodes, DEFAULT_PARTITION_SIZE);
}
@Override
public void formCluster(Set<ControllerNode> nodes, int partitionSize) {
checkNotNull(nodes, "Nodes cannot be null");
checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize));
clusterMetadataAdminService.setClusterMetadata(metadata);
try {
log.warn("Shutting down container for cluster reconfiguration!");
// Clean up persistent state associated with previous cluster configuration.
Tools.removeDirectory(System.getProperty("karaf.data") + "/partitions");
systemService.reboot("now", SystemService.Swipe.NONE);
} catch (Exception e) {
log.error("Unable to reboot container", e);
}
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
checkNotNull(ip, "IP address cannot be null");
checkArgument(tcpPort > 5000, "TCP port must be > 5000");
return store.addNode(nodeId, ip, tcpPort);
}
@Override
public void removeNode(NodeId nodeId) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
store.removeNode(nodeId);
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements ClusterStoreDelegate {
@Override
public void notify(ClusterEvent event) {
post(event);
}
}
private static Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
List<ControllerNode> sorted = new ArrayList<>(nodes);
Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
Set<Partition> partitions = Sets.newHashSet();
// add partitions
int length = nodes.size();
int count = Math.min(partitionSize, length);
for (int i = 0; i < length; i++) {
int index = i;
Set<NodeId> set = new HashSet<>(count);
for (int j = 0; j < count; j++) {
set.add(sorted.get((i + j) % length).id());
}
partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set));
}
return partitions;
}
/**
* Processes metadata by adding and removing nodes from the cluster.
*/
private synchronized void processMetadata(ClusterMetadata metadata) {
try {
ClusterMetadataDiff examiner =
new ClusterMetadataDiff(currentMetadata.get(), metadata);
examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
examiner.nodesRemoved().forEach(this::removeNode);
} finally {
currentMetadata.set(metadata);
}
}
private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
@Override
public void event(ClusterMetadataEvent event) {
processMetadata(event.subject());
}
}
}

View File

@ -28,9 +28,10 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.core.Version;
import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
@ -70,7 +71,10 @@ public class UpgradeManager
protected CoordinationService coordinationService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected UnifiedClusterService clusterService;
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MembershipService membershipService;
private Version localVersion;
private AtomicValue<Upgrade> state;
@ -243,13 +247,8 @@ public class UpgradeManager
}
// Determine whether any nodes have not been upgraded to the target version.
boolean upgradeComplete = clusterService.getNodes()
.stream()
.allMatch(node -> {
ControllerNode.State state = clusterService.getState(node.id());
Version version = clusterService.getVersion(node.id());
return state.isActive() && version != null && version.equals(upgraded.target());
});
boolean upgradeComplete = membershipService.getGroups().size() == 1
&& membershipService.getLocalGroup().version().equals(upgraded.target());
// If some nodes have not yet been upgraded, throw an exception.
if (!upgradeComplete) {
@ -333,13 +332,8 @@ public class UpgradeManager
}
// Determine whether any nodes are still running the target version.
boolean rollbackComplete = clusterService.getNodes()
.stream()
.allMatch(node -> {
ControllerNode.State state = clusterService.getState(node.id());
Version version = clusterService.getVersion(node.id());
return state.isActive() && version != null && version.equals(upgraded.source());
});
boolean rollbackComplete = membershipService.getGroups().size() == 1
&& membershipService.getLocalGroup().version().equals(upgraded.source());
// If some nodes have not yet been downgraded, throw an exception.
if (!rollbackComplete) {

View File

@ -16,17 +16,24 @@
package org.onosproject.upgrade.impl;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.UnifiedClusterServiceAdapter;
import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.Member;
import org.onosproject.cluster.MembershipGroup;
import org.onosproject.cluster.MembershipServiceAdapter;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.Version;
import org.onosproject.core.VersionServiceAdapter;
@ -59,7 +66,29 @@ public class UpgradeManagerTest {
@SuppressWarnings("unchecked")
private UpgradeManager createUpgradeManager(Version version, Upgrade state, List<Version> versions) {
UpgradeManager upgradeManager = new UpgradeManager();
upgradeManager.clusterService = new UnifiedClusterServiceAdapter() {
upgradeManager.membershipService = new MembershipServiceAdapter() {
@Override
public MembershipGroup getLocalGroup() {
return getGroups()
.stream()
.filter(group -> group.version().equals(version))
.findFirst()
.get();
}
@Override
public Collection<MembershipGroup> getGroups() {
AtomicInteger nodeCounter = new AtomicInteger();
Map<Version, Set<Member>> groups = Maps.newHashMap();
versions.stream().forEach(version -> {
groups.computeIfAbsent(version, k -> Sets.newHashSet())
.add(new Member(NodeId.nodeId(String.valueOf(nodeCounter.getAndIncrement())), version));
});
return Maps.transformEntries(groups, MembershipGroup::new).values();
}
};
upgradeManager.clusterService = new ClusterServiceAdapter() {
@Override
public Set<ControllerNode> getNodes() {
AtomicInteger nodeCounter = new AtomicInteger();
@ -83,11 +112,6 @@ public class UpgradeManagerTest {
.orElse(null);
}
@Override
public ControllerNode.State getState(NodeId nodeId) {
return ControllerNode.State.READY;
}
@Override
public Version getVersion(NodeId nodeId) {
return versions.get(Integer.parseInt(nodeId.id()));

View File

@ -1,347 +0,0 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.messaging.impl;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.util.Tools;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.utils.MeteringAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
@Component(componentAbstract = true)
public abstract class AbstractClusterCommunicationManager
implements ClusterCommunicationService {
private final Logger log = LoggerFactory.getLogger(getClass());
private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
private static final String PRIMITIVE_NAME = "clusterCommunication";
private static final String SUBJECT_PREFIX = "subject";
private static final String ENDPOINT_PREFIX = "endpoint";
private static final String SERIALIZING = "serialization";
private static final String DESERIALIZING = "deserialization";
private static final String NODE_PREFIX = "node:";
private static final String ROUND_TRIP_SUFFIX = ".rtt";
private static final String ONE_WAY_SUFFIX = ".oneway";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected UnifiedClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MessagingService messagingService;
private NodeId localNodeId;
/**
* Returns the type for the given message subject.
*
* @param subject the type for the given message subject
* @return the message subject
*/
protected abstract String getType(MessageSubject subject);
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public <M> void broadcast(M message,
MessageSubject subject,
Function<M, byte[]> encoder) {
checkPermission(CLUSTER_WRITE);
multicast(message,
subject,
encoder,
clusterService.getNodes()
.stream()
.filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
.map(ControllerNode::id)
.collect(Collectors.toSet()));
}
@Override
public <M> void broadcastIncludeSelf(M message,
MessageSubject subject,
Function<M, byte[]> encoder) {
checkPermission(CLUSTER_WRITE);
multicast(message,
subject,
encoder,
clusterService.getNodes()
.stream()
.map(ControllerNode::id)
.collect(Collectors.toSet()));
}
@Override
public <M> CompletableFuture<Void> unicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
NodeId toNodeId) {
checkPermission(CLUSTER_WRITE);
try {
byte[] payload = new ClusterMessage(
localNodeId,
subject,
timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
).getBytes();
return doUnicast(subject, payload, toNodeId);
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
}
@Override
public <M> void multicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
Set<NodeId> nodes) {
checkPermission(CLUSTER_WRITE);
byte[] payload = new ClusterMessage(
localNodeId,
subject,
timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
.getBytes();
nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
}
@Override
public <M, R> CompletableFuture<R> sendAndReceive(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
NodeId toNodeId) {
checkPermission(CLUSTER_WRITE);
try {
ClusterMessage envelope = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
apply(message));
return sendAndReceive(subject, envelope.getBytes(), toNodeId).
thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
}
private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
return messagingService.sendAsync(nodeEp, getType(subject), payload).whenComplete((r, e) -> context.stop(e));
}
private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
MeteringAgent.Context epContext = endpointMeteringAgent.
startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
MeteringAgent.Context subjectContext = subjectMeteringAgent.
startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
return messagingService.sendAndReceive(nodeEp, getType(subject), payload).
whenComplete((bytes, throwable) -> {
subjectContext.stop(throwable);
epContext.stop(throwable);
});
}
@Override
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber,
ExecutorService executor) {
checkPermission(CLUSTER_WRITE);
messagingService.registerHandler(getType(subject),
new InternalClusterMessageHandler(subscriber),
executor);
}
@Override
public void removeSubscriber(MessageSubject subject) {
checkPermission(CLUSTER_WRITE);
messagingService.unregisterHandler(getType(subject));
}
@Override
public <M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Function<M, R> handler,
Function<R, byte[]> encoder,
Executor executor) {
checkPermission(CLUSTER_WRITE);
messagingService.registerHandler(getType(subject),
new InternalMessageResponder<M, R>(decoder, encoder, m -> {
CompletableFuture<R> responseFuture = new CompletableFuture<>();
executor.execute(() -> {
try {
responseFuture.complete(handler.apply(m));
} catch (Exception e) {
responseFuture.completeExceptionally(e);
}
});
return responseFuture;
}));
}
@Override
public <M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Function<M, CompletableFuture<R>> handler,
Function<R, byte[]> encoder) {
checkPermission(CLUSTER_WRITE);
messagingService.registerHandler(getType(subject),
new InternalMessageResponder<>(decoder, encoder, handler));
}
@Override
public <M> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Consumer<M> handler,
Executor executor) {
checkPermission(CLUSTER_WRITE);
messagingService.registerHandler(getType(subject),
new InternalMessageConsumer<>(decoder, handler),
executor);
}
/**
* Performs the timed function, returning the value it would while timing the operation.
*
* @param timedFunction the function to be timed
* @param meter the metering agent to be used to time the function
* @param opName the opname to be used when starting the meter
* @param <A> The param type of the function
* @param <B> The return type of the function
* @return the value returned by the timed function
*/
private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
MeteringAgent meter, String opName) {
checkNotNull(timedFunction);
checkNotNull(meter);
checkNotNull(opName);
return new Function<A, B>() {
@Override
public B apply(A a) {
final MeteringAgent.Context context = meter.startTimer(opName);
B result = null;
try {
result = timedFunction.apply(a);
context.stop(null);
return result;
} catch (Exception e) {
context.stop(e);
Throwables.propagate(e);
return null;
}
}
};
}
private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
private ClusterMessageHandler handler;
public InternalClusterMessageHandler(ClusterMessageHandler handler) {
this.handler = handler;
}
@Override
public byte[] apply(Endpoint sender, byte[] bytes) {
ClusterMessage message = ClusterMessage.fromBytes(bytes);
handler.handle(message);
return message.response();
}
}
private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
private final Function<byte[], M> decoder;
private final Function<R, byte[]> encoder;
private final Function<M, CompletableFuture<R>> handler;
public InternalMessageResponder(Function<byte[], M> decoder,
Function<R, byte[]> encoder,
Function<M, CompletableFuture<R>> handler) {
this.decoder = decoder;
this.encoder = encoder;
this.handler = handler;
}
@Override
public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
apply(ClusterMessage.fromBytes(bytes).payload())).
thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
}
}
private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
private final Function<byte[], M> decoder;
private final Consumer<M> consumer;
public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
this.decoder = decoder;
this.consumer = consumer;
}
@Override
public void accept(Endpoint sender, byte[] bytes) {
consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
apply(ClusterMessage.fromBytes(bytes).payload()));
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2014-present Open Networking Foundation
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,24 +15,326 @@
*/
package org.onosproject.store.cluster.messaging.impl;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.VersionService;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.utils.MeteringAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
@Component(immediate = true)
@Service
public class ClusterCommunicationManager extends AbstractClusterCommunicationManager {
public class ClusterCommunicationManager implements ClusterCommunicationService {
private static final char VERSION_SEP = '-';
private final Logger log = LoggerFactory.getLogger(getClass());
private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
private static final String PRIMITIVE_NAME = "clusterCommunication";
private static final String SUBJECT_PREFIX = "subject";
private static final String ENDPOINT_PREFIX = "endpoint";
private static final String SERIALIZING = "serialization";
private static final String DESERIALIZING = "deserialization";
private static final String NODE_PREFIX = "node:";
private static final String ROUND_TRIP_SUFFIX = ".rtt";
private static final String ONE_WAY_SUFFIX = ".oneway";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private VersionService versionService;
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MessagingService messagingService;
private NodeId localNodeId;
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
protected String getType(MessageSubject subject) {
return subject.value() + VERSION_SEP + versionService.version().toString();
public <M> void broadcast(M message,
MessageSubject subject,
Function<M, byte[]> encoder) {
checkPermission(CLUSTER_WRITE);
multicast(message,
subject,
encoder,
clusterService.getNodes()
.stream()
.filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
.map(ControllerNode::id)
.collect(Collectors.toSet()));
}
@Override
public <M> void broadcastIncludeSelf(M message,
MessageSubject subject,
Function<M, byte[]> encoder) {
checkPermission(CLUSTER_WRITE);
multicast(message,
subject,
encoder,
clusterService.getNodes()
.stream()
.map(ControllerNode::id)
.collect(Collectors.toSet()));
}
@Override
public <M> CompletableFuture<Void> unicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
NodeId toNodeId) {
checkPermission(CLUSTER_WRITE);
try {
byte[] payload = new ClusterMessage(
localNodeId,
subject,
timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
).getBytes();
return doUnicast(subject, payload, toNodeId);
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
}
@Override
public <M> void multicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
Set<NodeId> nodes) {
checkPermission(CLUSTER_WRITE);
byte[] payload = new ClusterMessage(
localNodeId,
subject,
timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
.getBytes();
nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
}
@Override
public <M, R> CompletableFuture<R> sendAndReceive(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
NodeId toNodeId) {
checkPermission(CLUSTER_WRITE);
try {
ClusterMessage envelope = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
apply(message));
return sendAndReceive(subject, envelope.getBytes(), toNodeId).
thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
}
private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
return messagingService.sendAsync(nodeEp, subject.toString(), payload).whenComplete((r, e) -> context.stop(e));
}
private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
MeteringAgent.Context epContext = endpointMeteringAgent.
startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
MeteringAgent.Context subjectContext = subjectMeteringAgent.
startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
return messagingService.sendAndReceive(nodeEp, subject.toString(), payload).
whenComplete((bytes, throwable) -> {
subjectContext.stop(throwable);
epContext.stop(throwable);
});
}
@Override
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber,
ExecutorService executor) {
checkPermission(CLUSTER_WRITE);
messagingService.registerHandler(subject.toString(),
new InternalClusterMessageHandler(subscriber),
executor);
}
@Override
public void removeSubscriber(MessageSubject subject) {
checkPermission(CLUSTER_WRITE);
messagingService.unregisterHandler(subject.toString());
}
@Override
public <M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Function<M, R> handler,
Function<R, byte[]> encoder,
Executor executor) {
checkPermission(CLUSTER_WRITE);
messagingService.registerHandler(subject.toString(),
new InternalMessageResponder<M, R>(decoder, encoder, m -> {
CompletableFuture<R> responseFuture = new CompletableFuture<>();
executor.execute(() -> {
try {
responseFuture.complete(handler.apply(m));
} catch (Exception e) {
responseFuture.completeExceptionally(e);
}
});
return responseFuture;
}));
}
@Override
public <M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Function<M, CompletableFuture<R>> handler,
Function<R, byte[]> encoder) {
checkPermission(CLUSTER_WRITE);
messagingService.registerHandler(subject.toString(),
new InternalMessageResponder<>(decoder, encoder, handler));
}
@Override
public <M> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Consumer<M> handler,
Executor executor) {
checkPermission(CLUSTER_WRITE);
messagingService.registerHandler(subject.toString(),
new InternalMessageConsumer<>(decoder, handler),
executor);
}
/**
* Performs the timed function, returning the value it would while timing the operation.
*
* @param timedFunction the function to be timed
* @param meter the metering agent to be used to time the function
* @param opName the opname to be used when starting the meter
* @param <A> The param type of the function
* @param <B> The return type of the function
* @return the value returned by the timed function
*/
private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
MeteringAgent meter, String opName) {
checkNotNull(timedFunction);
checkNotNull(meter);
checkNotNull(opName);
return new Function<A, B>() {
@Override
public B apply(A a) {
final MeteringAgent.Context context = meter.startTimer(opName);
B result = null;
try {
result = timedFunction.apply(a);
context.stop(null);
return result;
} catch (Exception e) {
context.stop(e);
Throwables.propagate(e);
return null;
}
}
};
}
private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
private ClusterMessageHandler handler;
public InternalClusterMessageHandler(ClusterMessageHandler handler) {
this.handler = handler;
}
@Override
public byte[] apply(Endpoint sender, byte[] bytes) {
ClusterMessage message = ClusterMessage.fromBytes(bytes);
handler.handle(message);
return message.response();
}
}
private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
private final Function<byte[], M> decoder;
private final Function<R, byte[]> encoder;
private final Function<M, CompletableFuture<R>> handler;
public InternalMessageResponder(Function<byte[], M> decoder,
Function<R, byte[]> encoder,
Function<M, CompletableFuture<R>> handler) {
this.decoder = decoder;
this.encoder = encoder;
this.handler = handler;
}
@Override
public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
apply(ClusterMessage.fromBytes(bytes).payload())).
thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
}
}
private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
private final Function<byte[], M> decoder;
private final Consumer<M> consumer;
public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
this.decoder = decoder;
this.consumer = consumer;
}
@Override
public void accept(Endpoint sender, byte[] bytes) {
consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
apply(ClusterMessage.fromBytes(bytes).payload()));
}
}
}

View File

@ -1,32 +0,0 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.messaging.impl;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
@Component(immediate = true)
@Service
public class UnifiedClusterCommunicationManager
extends AbstractClusterCommunicationManager
implements UnifiedClusterCommunicationService {
@Override
protected String getType(MessageSubject subject) {
return subject.value();
}
}

View File

@ -25,12 +25,12 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicValue;
@ -70,10 +70,10 @@ public class CoordinationManager implements CoordinationService {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected UnifiedClusterService clusterService;
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected UnifiedClusterCommunicationService clusterCommunicator;
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;

View File

@ -15,21 +15,21 @@
*/
package org.onosproject.store.primitives.impl;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@ -38,8 +38,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public class EventuallyConsistentMapBuilderImpl<K, V>
implements EventuallyConsistentMapBuilder<K, V> {
private final MembershipService clusterService;
private final ClusterCommunicator clusterCommunicator;
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private String name;
private KryoNamespace serializer;
@ -64,9 +64,10 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
* @param clusterCommunicator cluster communication service
* @param persistenceService persistence service
*/
public EventuallyConsistentMapBuilderImpl(MembershipService clusterService,
ClusterCommunicator clusterCommunicator,
PersistenceService persistenceService) {
public EventuallyConsistentMapBuilderImpl(
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
PersistenceService persistenceService) {
this.persistenceService = persistenceService;
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);

View File

@ -15,34 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -67,6 +39,34 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@ -86,8 +86,8 @@ public class EventuallyConsistentMapImpl<K, V>
private final Map<K, MapValue<V>> items;
private final MembershipService clusterService;
private final ClusterCommunicator clusterCommunicator;
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final Serializer serializer;
private final NodeId localNodeId;
private final PersistenceService persistenceService;
@ -162,8 +162,8 @@ public class EventuallyConsistentMapImpl<K, V>
* @param persistenceService persistence service
*/
EventuallyConsistentMapImpl(String mapName,
MembershipService clusterService,
ClusterCommunicator clusterCommunicator,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace ns,
BiFunction<K, V, Timestamp> timestampProvider,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,

View File

@ -45,7 +45,7 @@ import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionEvent;
@ -71,7 +71,7 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected UnifiedClusterCommunicationService clusterCommunicator;
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataService metadataService;
@ -103,24 +103,24 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
Version targetVersion = upgradeService.getState().target();
currentClusterMetadata.get()
.getPartitions()
.forEach(partition -> inactivePartitions.put(partition.getId(), new StoragePartition(
partition,
sourceVersion,
null,
clusterCommunicator,
clusterService,
new File(System.getProperty("karaf.data") +
"/partitions/" + sourceVersion + "/" + partition.getId()))));
currentClusterMetadata.get()
.getPartitions()
.forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
partition,
targetVersion,
sourceVersion,
clusterCommunicator,
clusterService,
new File(System.getProperty("karaf.data") +
"/partitions/" + targetVersion + "/" + partition.getId()))));
.forEach(partition -> {
inactivePartitions.put(partition.getId(), new StoragePartition(
partition,
sourceVersion,
null,
clusterCommunicator,
clusterService,
new File(System.getProperty("karaf.data") +
"/partitions/" + sourceVersion + "/" + partition.getId())));
activePartitions.put(partition.getId(), new StoragePartition(
partition,
targetVersion,
sourceVersion,
clusterCommunicator,
clusterService,
new File(System.getProperty("karaf.data") +
"/partitions/" + targetVersion + "/" + partition.getId())));
});
// We have to fork existing partitions before we can start inactive partition servers to
// avoid duplicate message handlers when both servers are running.

View File

@ -40,7 +40,7 @@ import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.session.SessionId;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.Serializer;
/**
@ -51,7 +51,7 @@ public class RaftClientCommunicator extends RaftCommunicator implements RaftClie
public RaftClientCommunicator(
String prefix,
Serializer serializer,
ClusterCommunicator clusterCommunicator) {
ClusterCommunicationService clusterCommunicator) {
super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
}

View File

@ -22,7 +22,7 @@ import java.util.concurrent.CompletionException;
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.cluster.MemberId;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.service.Serializer;
@ -35,12 +35,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
public abstract class RaftCommunicator {
protected final RaftMessageContext context;
protected final Serializer serializer;
protected final ClusterCommunicator clusterCommunicator;
protected final ClusterCommunicationService clusterCommunicator;
public RaftCommunicator(
RaftMessageContext context,
Serializer serializer,
ClusterCommunicator clusterCommunicator) {
ClusterCommunicationService clusterCommunicator) {
this.context = checkNotNull(context, "context cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");

View File

@ -57,7 +57,6 @@ import io.atomix.protocols.raft.protocol.VoteResponse;
import io.atomix.protocols.raft.session.SessionId;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.service.Serializer;
/**
@ -68,7 +67,7 @@ public class RaftServerCommunicator extends RaftCommunicator implements RaftServ
public RaftServerCommunicator(
String prefix,
Serializer serializer,
ClusterCommunicator clusterCommunicator) {
ClusterCommunicationService clusterCommunicator) {
super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
}

View File

@ -29,10 +29,10 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.PartitionId;
import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionService;
@ -81,10 +81,10 @@ public class StorageManager implements StorageService, StorageAdminService {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected UnifiedClusterService clusterService;
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected UnifiedClusterCommunicationService clusterCommunicator;
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;

View File

@ -29,12 +29,12 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.service.RaftService;
import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
@ -54,8 +54,8 @@ import org.onosproject.store.service.Serializer;
public class StoragePartition implements Managed<StoragePartition> {
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final UnifiedClusterCommunicationService clusterCommunicator;
private final MembershipService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final ClusterService clusterService;
private final Version version;
private final Version source;
private final File dataFolder;
@ -85,8 +85,8 @@ public class StoragePartition implements Managed<StoragePartition> {
Partition partition,
Version version,
Version source,
UnifiedClusterCommunicationService clusterCommunicator,
MembershipService clusterService,
ClusterCommunicationService clusterCommunicator,
ClusterService clusterService,
File dataFolder) {
this.partition = partition;
this.version = version;
@ -191,6 +191,10 @@ public class StoragePartition implements Managed<StoragePartition> {
return source != null ?
clusterService.getNodes()
.stream()
.filter(node -> {
Version nodeVersion = clusterService.getVersion(node.id());
return nodeVersion != null && nodeVersion.equals(version);
})
.map(node -> MemberId.from(node.id().id()))
.collect(Collectors.toList()) :
Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
@ -202,6 +206,10 @@ public class StoragePartition implements Managed<StoragePartition> {
} else {
return clusterService.getNodes()
.stream()
.filter(node -> {
Version nodeVersion = clusterService.getVersion(node.id());
return nodeVersion != null && nodeVersion.equals(version);
})
.map(node -> MemberId.from(node.id().id()))
.collect(Collectors.toList());
}
@ -231,13 +239,10 @@ public class StoragePartition implements Managed<StoragePartition> {
clusterCommunicator);
CompletableFuture<Void> future;
if (clusterService.getNodes().size() == 1) {
if (getMemberIds().size() == 1) {
future = server.fork(version);
} else {
future = server.join(clusterService.getNodes().stream()
.filter(node -> !node.id().equals(localNodeId))
.map(node -> MemberId.from(node.id().id()))
.collect(Collectors.toList()));
future = server.join(getMemberIds());
}
return future.thenRun(() -> this.server = server);
}

View File

@ -28,7 +28,7 @@ import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import org.onosproject.core.Version;
import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
@ -49,13 +49,13 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
private final MemberId localMemberId;
private final StoragePartition partition;
private final UnifiedClusterCommunicationService clusterCommunicator;
private final ClusterCommunicationService clusterCommunicator;
private RaftServer server;
public StoragePartitionServer(
StoragePartition partition,
MemberId localMemberId,
UnifiedClusterCommunicationService clusterCommunicator) {
ClusterCommunicationService clusterCommunicator) {
this.partition = partition;
this.localMemberId = localMemberId;
this.clusterCommunicator = clusterCommunicator;