KryoSerializationService -> Serializer

- no longer a shared OSGi service.

Change-Id: Ie2b2320e92685cd515b30ffc472e3a02149a5bbd
This commit is contained in:
Yuta HIGUCHI 2014-10-07 09:23:43 -07:00
parent 53a285d5ee
commit 672488d8cf
13 changed files with 36 additions and 79 deletions

View File

@ -36,8 +36,6 @@ import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService; import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager; import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.device.impl.DistributedDeviceStore; import org.onlab.onos.store.device.impl.DistributedDeviceStore;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.onlab.packet.IpPrefix; import org.onlab.packet.IpPrefix;
import java.util.ArrayList; import java.util.ArrayList;
@ -95,7 +93,6 @@ public class DistributedDeviceManagerTest {
private DistributedDeviceStore dstore; private DistributedDeviceStore dstore;
private TestMastershipManager masterManager; private TestMastershipManager masterManager;
private EventDeliveryService eventService; private EventDeliveryService eventService;
private KryoSerializationManager serializationMgr;
@Before @Before
public void setUp() { public void setUp() {
@ -111,10 +108,7 @@ public class DistributedDeviceManagerTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate(); storeManager.activate();
serializationMgr = new KryoSerializationManager(); dstore = new TestDistributedDeviceStore(storeManager);
serializationMgr.activate();
dstore = new TestDistributedDeviceStore(storeManager, serializationMgr);
dstore.activate(); dstore.activate();
mgr.store = dstore; mgr.store = dstore;
@ -140,7 +134,6 @@ public class DistributedDeviceManagerTest {
mgr.deactivate(); mgr.deactivate();
dstore.deactivate(); dstore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate(); storeManager.deactivate();
} }
@ -306,10 +299,8 @@ public class DistributedDeviceManagerTest {
private class TestDistributedDeviceStore extends DistributedDeviceStore { private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore(StoreService storeService, public TestDistributedDeviceStore(StoreService storeService) {
KryoSerializationService kryoSerializationService) {
this.storeService = storeService; this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
} }
} }

View File

@ -57,7 +57,7 @@ public class DistributedClusterStore
rawNodes = theInstance.getMap("nodes"); rawNodes = theInstance.getMap("nodes");
OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawNodes); = new OptionalCacheLoader<>(serializer, rawNodes);
nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true); rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);

View File

@ -52,7 +52,7 @@ implements MastershipStore {
rawMasters = theInstance.getMap("masters"); rawMasters = theInstance.getMap("masters");
OptionalCacheLoader<DeviceId, NodeId> nodeLoader OptionalCacheLoader<DeviceId, NodeId> nodeLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawMasters); = new OptionalCacheLoader<>(serializer, rawMasters);
masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true); rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);

View File

@ -15,7 +15,8 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.event.Event; import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore; import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate; import org.onlab.onos.store.StoreDelegate;
import org.onlab.onos.store.serializers.KryoSerializationService; import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.Serializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
@ -24,7 +25,7 @@ import static org.slf4j.LoggerFactory.getLogger;
/** /**
* Abstraction of a distributed store based on Hazelcast. * Abstraction of a distributed store based on Hazelcast.
*/ */
@Component(componentAbstract = true) @Component
public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>> public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
extends AbstractStore<E, D> { extends AbstractStore<E, D> {
@ -33,13 +34,13 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService; protected StoreService storeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected Serializer serializer;
protected KryoSerializationService kryoSerializationService;
protected HazelcastInstance theInstance; protected HazelcastInstance theInstance;
@Activate @Activate
public void activate() { public void activate() {
serializer = new KryoSerializer();
theInstance = storeService.getHazelcastInstance(); theInstance = storeService.getHazelcastInstance();
} }
@ -50,7 +51,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return serialized object * @return serialized object
*/ */
protected byte[] serialize(Object obj) { protected byte[] serialize(Object obj) {
return kryoSerializationService.encode(obj); return serializer.encode(obj);
} }
/** /**
@ -61,7 +62,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return deserialized object * @return deserialized object
*/ */
protected <T> T deserialize(byte[] bytes) { protected <T> T deserialize(byte[] bytes) {
return kryoSerializationService.decode(bytes); return serializer.decode(bytes);
} }

View File

@ -2,7 +2,7 @@ package org.onlab.onos.store.common;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.store.serializers.KryoSerializationService; import org.onlab.onos.store.serializers.Serializer;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
@ -18,28 +18,28 @@ import com.hazelcast.core.IMap;
public final class OptionalCacheLoader<K, V> extends public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> { CacheLoader<K, Optional<V>> {
private final KryoSerializationService kryoSerializationService; private final Serializer serializer;
private IMap<byte[], byte[]> rawMap; private IMap<byte[], byte[]> rawMap;
/** /**
* Constructor. * Constructor.
* *
* @param kryoSerializationService to use for serialization * @param serializer to use for serialization
* @param rawMap underlying IMap * @param rawMap underlying IMap
*/ */
public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap<byte[], byte[]> rawMap) { public OptionalCacheLoader(Serializer serializer, IMap<byte[], byte[]> rawMap) {
this.kryoSerializationService = checkNotNull(kryoSerializationService); this.serializer = checkNotNull(serializer);
this.rawMap = checkNotNull(rawMap); this.rawMap = checkNotNull(rawMap);
} }
@Override @Override
public Optional<V> load(K key) throws Exception { public Optional<V> load(K key) throws Exception {
byte[] keyBytes = kryoSerializationService.encode(key); byte[] keyBytes = serializer.encode(key);
byte[] valBytes = rawMap.get(keyBytes); byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) { if (valBytes == null) {
return Optional.absent(); return Optional.absent();
} }
V dev = kryoSerializationService.decode(valBytes); V dev = serializer.decode(valBytes);
return Optional.of(dev); return Optional.of(dev);
} }
} }

View File

@ -88,7 +88,7 @@ public class DistributedDeviceStore
// TODO decide on Map name scheme to avoid collision // TODO decide on Map name scheme to avoid collision
rawDevices = theInstance.getMap("devices"); rawDevices = theInstance.getMap("devices");
final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawDevices); = new OptionalCacheLoader<>(serializer, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader)); devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance // refresh/populate cache based on notification from other instance
devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue); devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
@ -98,7 +98,7 @@ public class DistributedDeviceStore
rawDevicePorts = theInstance.getMap("devicePorts"); rawDevicePorts = theInstance.getMap("devicePorts");
final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts); = new OptionalCacheLoader<>(serializer, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader)); devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance // refresh/populate cache based on notification from other instance
portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue); portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);

View File

@ -71,7 +71,7 @@ public class DistributedLinkStore
// TODO decide on Map name scheme to avoid collision // TODO decide on Map name scheme to avoid collision
rawLinks = theInstance.getMap("links"); rawLinks = theInstance.getMap("links");
final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawLinks); = new OptionalCacheLoader<>(serializer, rawLinks);
links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader)); links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
// refresh/populate cache based on notification from other instance // refresh/populate cache based on notification from other instance
linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue); linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);

View File

@ -36,9 +36,6 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager; import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService; import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager; import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.hazelcast.config.Config; import com.hazelcast.config.Config;
@ -63,7 +60,6 @@ public class DistributedDeviceStoreTest {
private static final PortNumber P3 = PortNumber.portNumber(3); private static final PortNumber P3 = PortNumber.portNumber(3);
private DistributedDeviceStore deviceStore; private DistributedDeviceStore deviceStore;
private KryoSerializationManager serializationMgr;
private StoreManager storeManager; private StoreManager storeManager;
@ -85,10 +81,7 @@ public class DistributedDeviceStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate(); storeManager.activate();
serializationMgr = new KryoSerializationManager(); deviceStore = new TestDistributedDeviceStore(storeManager);
serializationMgr.activate();
deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr);
deviceStore.activate(); deviceStore.activate();
} }
@ -96,8 +89,6 @@ public class DistributedDeviceStoreTest {
public void tearDown() throws Exception { public void tearDown() throws Exception {
deviceStore.deactivate(); deviceStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate(); storeManager.deactivate();
} }
@ -392,10 +383,8 @@ public class DistributedDeviceStoreTest {
} }
private class TestDistributedDeviceStore extends DistributedDeviceStore { private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore(StoreService storeService, public TestDistributedDeviceStore(StoreService storeService) {
KryoSerializationService kryoSerializationService) {
this.storeService = storeService; this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
} }
} }
} }

View File

@ -30,9 +30,6 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager; import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService; import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager; import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.hazelcast.config.Config; import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast; import com.hazelcast.core.Hazelcast;
@ -51,7 +48,6 @@ public class DistributedLinkStoreTest {
private static final PortNumber P3 = PortNumber.portNumber(3); private static final PortNumber P3 = PortNumber.portNumber(3);
private StoreManager storeManager; private StoreManager storeManager;
private KryoSerializationManager serializationMgr;
private DistributedLinkStore linkStore; private DistributedLinkStore linkStore;
@ -71,17 +67,13 @@ public class DistributedLinkStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate(); storeManager.activate();
serializationMgr = new KryoSerializationManager(); linkStore = new TestDistributedLinkStore(storeManager);
serializationMgr.activate();
linkStore = new TestDistributedLinkStore(storeManager, serializationMgr);
linkStore.activate(); linkStore.activate();
} }
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
linkStore.deactivate(); linkStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate(); storeManager.deactivate();
} }
@ -361,10 +353,8 @@ public class DistributedLinkStoreTest {
class TestDistributedLinkStore extends DistributedLinkStore { class TestDistributedLinkStore extends DistributedLinkStore {
TestDistributedLinkStore(StoreService storeService, TestDistributedLinkStore(StoreService storeService) {
KryoSerializationService kryoSerializationService) {
this.storeService = storeService; this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
} }
} }
} }

View File

@ -1,9 +1,5 @@
package org.onlab.onos.store.serializers; package org.onlab.onos.store.serializers;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoPool; import org.onlab.util.KryoPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -11,25 +7,16 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* Serialization service using Kryo. * Serializer implementation using Kryo.
*/ */
@Component(immediate = true) public class KryoSerializer implements Serializer {
@Service
public class KryoSerializationManager implements KryoSerializationService {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private KryoPool serializerPool; private KryoPool serializerPool;
@Activate public KryoSerializer() {
public void activate() {
setupKryoPool(); setupKryoPool();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
} }
/** /**

View File

@ -6,11 +6,10 @@ import java.nio.ByteBuffer;
/** /**
* Service to serialize Objects into byte array. * Service to serialize Objects into byte array.
*/ */
public interface KryoSerializationService { public interface Serializer {
/** /**
* Serializes the specified object into bytes using one of the * Serializes the specified object into bytes.
* pre-registered serializers.
* *
* @param obj object to be serialized * @param obj object to be serialized
* @return serialized bytes * @return serialized bytes
@ -18,8 +17,7 @@ public interface KryoSerializationService {
public byte[] encode(final Object obj); public byte[] encode(final Object obj);
/** /**
* Serializes the specified object into bytes using one of the * Serializes the specified object into bytes.
* pre-registered serializers.
* *
* @param obj object to be serialized * @param obj object to be serialized
* @param buffer to write serialized bytes * @param buffer to write serialized bytes
@ -27,8 +25,7 @@ public interface KryoSerializationService {
public void encode(final Object obj, ByteBuffer buffer); public void encode(final Object obj, ByteBuffer buffer);
/** /**
* Deserializes the specified bytes into an object using one of the * Deserializes the specified bytes into an object.
* pre-registered serializers.
* *
* @param bytes bytes to be deserialized * @param bytes bytes to be deserialized
* @return deserialized object * @return deserialized object
@ -36,8 +33,7 @@ public interface KryoSerializationService {
public <T> T decode(final byte[] bytes); public <T> T decode(final byte[] bytes);
/** /**
* Deserializes the specified bytes into an object using one of the * Deserializes the specified bytes into an object.
* pre-registered serializers.
* *
* @param buffer bytes to be deserialized * @param buffer bytes to be deserialized
* @return deserialized object * @return deserialized object

View File

@ -53,6 +53,8 @@
<feature>onos-api</feature> <feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle> <bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle> <bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onlab-netty/1.0.0-SNAPSHOT</bundle>
</feature> </feature>
<feature name="onos-core-hazelcast" version="1.0.0" <feature name="onos-core-hazelcast" version="1.0.0"

View File

@ -39,6 +39,7 @@ public interface MessagingService {
*/ */
public void unregisterHandler(String type); public void unregisterHandler(String type);
// FIXME: remove me and add PayloadSerializer to all other methods
/** /**
* Specify the serializer to use for encoding/decoding payload. * Specify the serializer to use for encoding/decoding payload.
* *