mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-11-08 04:01:53 +01:00
Use partitionId instead of clusterName in CopycatTransport
Change-Id: I8e7ab3863a36944ac9e48e187037fb43695ebde3
This commit is contained in:
parent
dbe8a813b8
commit
f778c96889
@ -17,6 +17,7 @@ package org.onosproject.store.primitives.impl;
|
|||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
import org.onosproject.cluster.PartitionId;
|
||||||
import org.onosproject.store.cluster.messaging.MessagingService;
|
import org.onosproject.store.cluster.messaging.MessagingService;
|
||||||
|
|
||||||
import io.atomix.catalyst.transport.Client;
|
import io.atomix.catalyst.transport.Client;
|
||||||
@ -48,25 +49,25 @@ public class CopycatTransport implements Transport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private final Mode mode;
|
private final Mode mode;
|
||||||
private final String clusterName;
|
private final PartitionId partitionId;
|
||||||
private final MessagingService messagingService;
|
private final MessagingService messagingService;
|
||||||
|
|
||||||
public CopycatTransport(Mode mode, String clusterName, MessagingService messagingService) {
|
public CopycatTransport(Mode mode, PartitionId partitionId, MessagingService messagingService) {
|
||||||
this.mode = checkNotNull(mode);
|
this.mode = checkNotNull(mode);
|
||||||
this.clusterName = checkNotNull(clusterName);
|
this.partitionId = checkNotNull(partitionId);
|
||||||
this.messagingService = checkNotNull(messagingService);
|
this.messagingService = checkNotNull(messagingService);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Client client() {
|
public Client client() {
|
||||||
return new CopycatTransportClient(clusterName,
|
return new CopycatTransportClient(partitionId,
|
||||||
messagingService,
|
messagingService,
|
||||||
mode);
|
mode);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Server server() {
|
public Server server() {
|
||||||
return new CopycatTransportServer(clusterName,
|
return new CopycatTransportServer(partitionId,
|
||||||
messagingService);
|
messagingService);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,7 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
import org.apache.commons.lang.math.RandomUtils;
|
import org.apache.commons.lang.math.RandomUtils;
|
||||||
|
import org.onosproject.cluster.PartitionId;
|
||||||
import org.onosproject.store.cluster.messaging.MessagingService;
|
import org.onosproject.store.cluster.messaging.MessagingService;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
@ -34,13 +36,13 @@ import io.atomix.catalyst.util.concurrent.ThreadContext;
|
|||||||
*/
|
*/
|
||||||
public class CopycatTransportClient implements Client {
|
public class CopycatTransportClient implements Client {
|
||||||
|
|
||||||
private final String clusterName;
|
private final PartitionId partitionId;
|
||||||
private final MessagingService messagingService;
|
private final MessagingService messagingService;
|
||||||
private final CopycatTransport.Mode mode;
|
private final CopycatTransport.Mode mode;
|
||||||
private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
|
private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
|
||||||
|
|
||||||
CopycatTransportClient(String clusterName, MessagingService messagingService, CopycatTransport.Mode mode) {
|
CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
|
||||||
this.clusterName = checkNotNull(clusterName);
|
this.partitionId = checkNotNull(partitionId);
|
||||||
this.messagingService = checkNotNull(messagingService);
|
this.messagingService = checkNotNull(messagingService);
|
||||||
this.mode = checkNotNull(mode);
|
this.mode = checkNotNull(mode);
|
||||||
}
|
}
|
||||||
@ -51,7 +53,7 @@ public class CopycatTransportClient implements Client {
|
|||||||
CopycatTransportConnection connection = new CopycatTransportConnection(
|
CopycatTransportConnection connection = new CopycatTransportConnection(
|
||||||
nextConnectionId(),
|
nextConnectionId(),
|
||||||
CopycatTransport.Mode.CLIENT,
|
CopycatTransport.Mode.CLIENT,
|
||||||
clusterName,
|
partitionId,
|
||||||
remoteAddress,
|
remoteAddress,
|
||||||
messagingService,
|
messagingService,
|
||||||
context);
|
context);
|
||||||
|
|||||||
@ -32,6 +32,7 @@ import java.util.function.Consumer;
|
|||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.onlab.packet.IpAddress;
|
import org.onlab.packet.IpAddress;
|
||||||
import org.onlab.util.Tools;
|
import org.onlab.util.Tools;
|
||||||
|
import org.onosproject.cluster.PartitionId;
|
||||||
import org.onosproject.store.cluster.messaging.Endpoint;
|
import org.onosproject.store.cluster.messaging.Endpoint;
|
||||||
import org.onosproject.store.cluster.messaging.MessagingService;
|
import org.onosproject.store.cluster.messaging.MessagingService;
|
||||||
|
|
||||||
@ -77,7 +78,7 @@ public class CopycatTransportConnection implements Connection {
|
|||||||
|
|
||||||
CopycatTransportConnection(long connectionId,
|
CopycatTransportConnection(long connectionId,
|
||||||
CopycatTransport.Mode mode,
|
CopycatTransport.Mode mode,
|
||||||
String clusterName,
|
PartitionId partitionId,
|
||||||
Address address,
|
Address address,
|
||||||
MessagingService messagingService,
|
MessagingService messagingService,
|
||||||
ThreadContext context) {
|
ThreadContext context) {
|
||||||
@ -86,11 +87,11 @@ public class CopycatTransportConnection implements Connection {
|
|||||||
this.remoteAddress = checkNotNull(address);
|
this.remoteAddress = checkNotNull(address);
|
||||||
this.messagingService = checkNotNull(messagingService);
|
this.messagingService = checkNotNull(messagingService);
|
||||||
if (mode == CopycatTransport.Mode.CLIENT) {
|
if (mode == CopycatTransport.Mode.CLIENT) {
|
||||||
this.outboundMessageSubject = String.format("onos-copycat-%s", clusterName);
|
this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
|
||||||
this.inboundMessageSubject = String.format("onos-copycat-%s-%d", clusterName, connectionId);
|
this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
|
||||||
} else {
|
} else {
|
||||||
this.outboundMessageSubject = String.format("onos-copycat-%s-%d", clusterName, connectionId);
|
this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
|
||||||
this.inboundMessageSubject = String.format("onos-copycat-%s", clusterName);
|
this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
|
||||||
}
|
}
|
||||||
this.context = checkNotNull(context);
|
this.context = checkNotNull(context);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -29,6 +29,7 @@ import java.util.function.Consumer;
|
|||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.onlab.util.Tools;
|
import org.onlab.util.Tools;
|
||||||
|
import org.onosproject.cluster.PartitionId;
|
||||||
import org.onosproject.store.cluster.messaging.MessagingService;
|
import org.onosproject.store.cluster.messaging.MessagingService;
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
@ -46,15 +47,15 @@ public class CopycatTransportServer implements Server {
|
|||||||
|
|
||||||
private final AtomicBoolean listening = new AtomicBoolean(false);
|
private final AtomicBoolean listening = new AtomicBoolean(false);
|
||||||
private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
|
private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
|
||||||
private final String clusterName;
|
private final PartitionId partitionId;
|
||||||
private final MessagingService messagingService;
|
private final MessagingService messagingService;
|
||||||
private final String messageSubject;
|
private final String messageSubject;
|
||||||
private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
|
private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
|
||||||
|
|
||||||
CopycatTransportServer(String clusterName, MessagingService messagingService) {
|
CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
|
||||||
this.clusterName = checkNotNull(clusterName);
|
this.partitionId = checkNotNull(partitionId);
|
||||||
this.messagingService = checkNotNull(messagingService);
|
this.messagingService = checkNotNull(messagingService);
|
||||||
this.messageSubject = String.format("onos-copycat-%s", clusterName);
|
this.messageSubject = String.format("onos-copycat-%s", partitionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -78,7 +79,7 @@ public class CopycatTransportServer implements Server {
|
|||||||
newConnection.set(true);
|
newConnection.set(true);
|
||||||
return new CopycatTransportConnection(connectionId,
|
return new CopycatTransportConnection(connectionId,
|
||||||
CopycatTransport.Mode.SERVER,
|
CopycatTransport.Mode.SERVER,
|
||||||
clusterName,
|
partitionId,
|
||||||
senderAddress,
|
senderAddress,
|
||||||
messagingService,
|
messagingService,
|
||||||
getOrCreateContext(context));
|
getOrCreateContext(context));
|
||||||
@ -114,6 +115,6 @@ public class CopycatTransportServer implements Server {
|
|||||||
if (context != null) {
|
if (context != null) {
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
return new SingleThreadContext("copycat-transport-server-" + clusterName, parentContext.serializer().clone());
|
return new SingleThreadContext("copycat-transport-server-" + partitionId, parentContext.serializer().clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user