From 3cc4d9beee4bff3c5d19f4c04c313cefc285c0cf Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Sat, 29 Nov 2014 18:17:17 -0800 Subject: [PATCH] HazelcastLinkResourceStore Change-Id: Ic5d6bf9b54b023368a883e3665484900ccda44e7 --- .../java/org/onlab/onos/store/hz/STxMap.java | 211 ++++++++ .../impl/HazelcastLinkResourceStore.java | 463 ++++++++++++++++++ .../impl/HazelcastLinkResourceStoreTest.java | 194 ++++++++ 3 files changed, 868 insertions(+) create mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/hz/STxMap.java create mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/HazelcastLinkResourceStore.java create mode 100644 core/store/dist/src/test/java/org/onlab/onos/store/resource/impl/HazelcastLinkResourceStoreTest.java diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/STxMap.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/STxMap.java new file mode 100644 index 0000000000..0f2737a11c --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/STxMap.java @@ -0,0 +1,211 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.onos.store.hz; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.onlab.onos.store.serializers.StoreSerializer; + +import com.hazelcast.core.TransactionalMap; +import com.hazelcast.query.Predicate; + +// TODO: implement Predicate, etc. if we need them. +/** + * Wrapper around TransactionalMap<byte[], byte[]> which serializes/deserializes + * key and value using StoreSerializer. + * + * @param key type + * @param value type + */ +public class STxMap implements TransactionalMap { + + private final TransactionalMap m; + private final StoreSerializer serializer; + + /** + * Creates a STxMap instance. + * + * @param baseMap base IMap to use + * @param serializer serializer to use for both key and value + */ + public STxMap(TransactionalMap baseMap, StoreSerializer serializer) { + this.m = checkNotNull(baseMap); + this.serializer = checkNotNull(serializer); + } + + @Override + public int size() { + return m.size(); + } + + @Override + public boolean isEmpty() { + return m.isEmpty(); + } + + @Deprecated + @Override + public Object getId() { + return m.getId(); + } + + @Override + public String getPartitionKey() { + return m.getPartitionKey(); + } + + @Override + public String getName() { + return m.getName(); + } + + @Override + public String getServiceName() { + return m.getServiceName(); + } + + @Override + public void destroy() { + m.destroy(); + } + + @Override + public boolean containsKey(Object key) { + return m.containsKey(serializeKey(key)); + } + + @Override + public V get(Object key) { + return deserializeVal(m.get(serializeKey(key))); + } + + @Override + public V getForUpdate(Object key) { + // TODO Auto-generated method stub + return deserializeVal(m.getForUpdate(serializeKey(key))); + } + + @Override + public V put(K key, V value) { + return deserializeVal(m.put(serializeKey(key), serializeVal(value))); + } + + @Override + public V remove(Object key) { + return deserializeVal(m.remove(serializeKey(key))); + } + + @Override + public boolean remove(Object key, Object value) { + return m.remove(serializeKey(key), serializeVal(value)); + } + + @Override + public void delete(Object key) { + m.delete(serializeKey(key)); + } + + @Override + public V put(K key, V value, long ttl, TimeUnit timeunit) { + return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit)); + } + + @Override + public V putIfAbsent(K key, V value) { + return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value))); + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue)); + } + + @Override + public V replace(K key, V value) { + return deserializeVal(m.replace(serializeKey(key), serializeVal(value))); + } + + @Override + public void set(K key, V value) { + m.set(serializeKey(key), serializeVal(value)); + } + + + @Override + public Set keySet() { + return deserializeKeySet(m.keySet()); + } + + @Override + public Collection values() { + return deserializeVals(m.values()); + } + + @Deprecated // marking method not implemented + @SuppressWarnings("rawtypes") + @Override + public Set keySet(Predicate predicate) { + throw new UnsupportedOperationException(); + } + + @Deprecated // marking method not implemented + @SuppressWarnings("rawtypes") + @Override + public Collection values(Predicate predicate) { + throw new UnsupportedOperationException(); + } + + private byte[] serializeKey(Object key) { + return serializer.encode(key); + } + + private K deserializeKey(byte[] key) { + return serializer.decode(key); + } + + private byte[] serializeVal(Object val) { + return serializer.encode(val); + } + + private V deserializeVal(byte[] val) { + if (val == null) { + return null; + } + return serializer.decode(val.clone()); + } + + private Set deserializeKeySet(Set keys) { + Set dsk = new HashSet<>(keys.size()); + for (byte[] key : keys) { + dsk.add(deserializeKey(key)); + } + return dsk; + } + + private Collection deserializeVals(Collection vals) { + Collection dsl = new ArrayList<>(vals.size()); + for (byte[] val : vals) { + dsl.add(deserializeVal(val)); + } + return dsl; + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/HazelcastLinkResourceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/HazelcastLinkResourceStore.java new file mode 100644 index 0000000000..4729a84b4e --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/HazelcastLinkResourceStore.java @@ -0,0 +1,463 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.onos.store.resource.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.onos.net.Link; +import org.onlab.onos.net.LinkKey; +import org.onlab.onos.net.intent.IntentId; +import org.onlab.onos.net.link.LinkService; +import org.onlab.onos.net.resource.Bandwidth; +import org.onlab.onos.net.resource.BandwidthResourceAllocation; +import org.onlab.onos.net.resource.Lambda; +import org.onlab.onos.net.resource.LambdaResourceAllocation; +import org.onlab.onos.net.resource.LinkResourceAllocations; +import org.onlab.onos.net.resource.LinkResourceEvent; +import org.onlab.onos.net.resource.LinkResourceStore; +import org.onlab.onos.net.resource.ResourceAllocation; +import org.onlab.onos.net.resource.ResourceType; +import org.onlab.onos.store.StoreDelegate; +import org.onlab.onos.store.hz.AbstractHazelcastStore; +import org.onlab.onos.store.hz.STxMap; +import org.slf4j.Logger; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.hazelcast.core.TransactionalMap; +import com.hazelcast.transaction.TransactionContext; +import com.hazelcast.transaction.TransactionException; +import com.hazelcast.transaction.TransactionOptions; +import com.hazelcast.transaction.TransactionOptions.TransactionType; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Manages link resources using Hazelcast. + */ +@Component(immediate = true, enabled = false) +@Service +public class HazelcastLinkResourceStore + extends AbstractHazelcastStore> + implements LinkResourceStore { + + + private final Logger log = getLogger(getClass()); + + // FIXME: what is the Bandwidth unit? + private static final Bandwidth DEFAULT_BANDWIDTH = Bandwidth.valueOf(1_000); + + private static final Bandwidth EMPTY_BW = Bandwidth.valueOf(0); + + // table to store current allocations + /** LinkKey -> List. */ + private static final String LINK_RESOURCE_ALLOCATIONS = "LinkResourceAllocations"; + + /** IntentId -> LinkResourceAllocations. */ + private static final String INTENT_ALLOCATIONS = "IntentAllocations"; + + + // TODO make this configurable + // number of retries to attempt on allocation failure, due to + // concurrent update + private static int maxAllocateRetries = 5; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected LinkService linkService; + + // Link annotation key name to use as bandwidth + private String bandwidthAnnotation = "bandwidth"; + // Link annotation key name to use as max lambda + private String wavesAnnotation = "optical.waves"; + + @Override + @Activate + public void activate() { + super.activate(); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + private STxMap getIntentAllocs(TransactionContext tx) { + TransactionalMap raw = tx.getMap(INTENT_ALLOCATIONS); + return new STxMap<>(raw, serializer); + } + + private STxMap> getLinkAllocs(TransactionContext tx) { + TransactionalMap raw = tx.getMap(LINK_RESOURCE_ALLOCATIONS); + return new STxMap<>(raw, serializer); + } + + private Set getResourceCapacity(ResourceType type, Link link) { + // TODO: plugin/provider mechanism to add resource type in the future? + if (type == ResourceType.BANDWIDTH) { + return ImmutableSet.of(getBandwidthResourceCapacity(link)); + } + if (type == ResourceType.LAMBDA) { + return getLambdaResourceCapacity(link); + } + return null; + } + + private Set getLambdaResourceCapacity(Link link) { + // FIXME enumerate all the possible link/port lambdas + Set allocations = new HashSet<>(); + try { + final int waves = Integer.parseInt(link.annotations().value(wavesAnnotation)); + for (int i = 1; i <= waves; i++) { + allocations.add(new LambdaResourceAllocation(Lambda.valueOf(i))); + } + } catch (NumberFormatException e) { + log.debug("No {} annotation on link %s", wavesAnnotation, link); + } + return allocations; + } + + private BandwidthResourceAllocation getBandwidthResourceCapacity(Link link) { + + // if Link annotation exist, use them + // if all fails, use DEFAULT_BANDWIDTH + + Bandwidth bandwidth = null; + String strBw = link.annotations().value(bandwidthAnnotation); + if (strBw != null) { + try { + bandwidth = Bandwidth.valueOf(Double.parseDouble(strBw)); + } catch (NumberFormatException e) { + // do nothings + bandwidth = null; + } + } + + if (bandwidth == null) { + // fall back, use fixed default + bandwidth = DEFAULT_BANDWIDTH; + } + return new BandwidthResourceAllocation(bandwidth); + } + + private Map> getResourceCapacity(Link link) { + Map> caps = new HashMap<>(); + for (ResourceType type : ResourceType.values()) { + Set cap = getResourceCapacity(type, link); + if (cap != null) { + caps.put(type, cap); + } + } + return caps; + } + + @Override + public Set getFreeResources(Link link) { + Map> freeResources = getFreeResourcesEx(link); + Set allFree = new HashSet<>(); + for (Set r:freeResources.values()) { + allFree.addAll(r); + } + return allFree; + } + + private Map> getFreeResourcesEx(Link link) { + // returns capacity - allocated + + checkNotNull(link); + Map> free = new HashMap<>(); + final Map> caps = getResourceCapacity(link); + final Iterable allocations = getAllocations(link); + + for (ResourceType type : ResourceType.values()) { + // there should be class/category of resources + switch (type) { + case BANDWIDTH: + { + Set bw = caps.get(ResourceType.BANDWIDTH); + if (bw == null || bw.isEmpty()) { + bw = Sets.newHashSet(new BandwidthResourceAllocation(EMPTY_BW)); + } + + BandwidthResourceAllocation cap = (BandwidthResourceAllocation) bw.iterator().next(); + double freeBw = cap.bandwidth().toDouble(); + + // enumerate current allocations, subtracting resources + for (LinkResourceAllocations alloc : allocations) { + Set types = alloc.getResourceAllocation(link); + for (ResourceAllocation a : types) { + if (a instanceof BandwidthResourceAllocation) { + BandwidthResourceAllocation bwA = (BandwidthResourceAllocation) a; + freeBw -= bwA.bandwidth().toDouble(); + } + } + } + + free.put(type, Sets.newHashSet(new BandwidthResourceAllocation(Bandwidth.valueOf(freeBw)))); + break; + } + + case LAMBDA: + { + Set lmd = caps.get(type); + if (lmd == null || lmd.isEmpty()) { + // nothing left + break; + } + Set freeL = new HashSet<>(); + for (ResourceAllocation r : lmd) { + if (r instanceof LambdaResourceAllocation) { + freeL.add((LambdaResourceAllocation) r); + } + } + + // enumerate current allocations, removing resources + for (LinkResourceAllocations alloc : allocations) { + Set types = alloc.getResourceAllocation(link); + for (ResourceAllocation a : types) { + if (a instanceof LambdaResourceAllocation) { + freeL.remove(a); + } + } + } + + free.put(type, freeL); + break; + } + + default: + break; + } + } + return free; + } + + @Override + public void allocateResources(LinkResourceAllocations allocations) { + checkNotNull(allocations); + + for (int i = 0; i < maxAllocateRetries; ++i) { + TransactionContext tx = theInstance.newTransactionContext(); + tx.beginTransaction(); + try { + + STxMap intentAllocs = getIntentAllocs(tx); + // should this be conditional write? + intentAllocs.put(allocations.intendId(), allocations); + + for (Link link : allocations.links()) { + allocateLinkResource(tx, link, allocations); + } + + tx.commitTransaction(); + return; + } catch (TransactionException e) { + log.debug("Failed to commit allocations for {}. [retry={}]", + allocations.intendId(), i); + log.trace(" details {} ", allocations, e); + continue; + } catch (Exception e) { + log.error("Exception thrown, rolling back", e); + tx.rollbackTransaction(); + throw e; + } + } + } + + private void allocateLinkResource(TransactionContext tx, Link link, + LinkResourceAllocations allocations) { + + // requested resources + Set reqs = allocations.getResourceAllocation(link); + + Map> available = getFreeResourcesEx(link); + for (ResourceAllocation req : reqs) { + Set avail = available.get(req.type()); + if (req instanceof BandwidthResourceAllocation) { + // check if allocation should be accepted + if (avail.isEmpty()) { + checkState(!avail.isEmpty(), + "There's no Bandwidth resource on %s?", + link); + } + BandwidthResourceAllocation bw = (BandwidthResourceAllocation) avail.iterator().next(); + double bwLeft = bw.bandwidth().toDouble(); + bwLeft -= ((BandwidthResourceAllocation) req).bandwidth().toDouble(); + if (bwLeft < 0) { + // FIXME throw appropriate Exception + checkState(bwLeft >= 0, + "There's no Bandwidth left on %s. %s", + link, bwLeft); + } + } else if (req instanceof LambdaResourceAllocation) { + + // check if allocation should be accepted + if (!avail.contains(req)) { + // requested lambda was not available + // FIXME throw appropriate exception + checkState(avail.contains(req), + "Allocating %s on %s failed", + req, link); + } + } + } + // all requests allocatable => add allocation + final LinkKey linkKey = LinkKey.linkKey(link); + STxMap> linkAllocs = getLinkAllocs(tx); + final List before = linkAllocs.get(linkKey); + List after = new ArrayList<>(before.size() + 1); + after.addAll(before); + after.add(allocations); + linkAllocs.replace(linkKey, before, after); + } + + @Override + public LinkResourceEvent releaseResources(LinkResourceAllocations allocations) { + checkNotNull(allocations); + + final IntentId intendId = allocations.intendId(); + final Collection links = allocations.links(); + + boolean success = false; + do { + // TODO: smaller tx unit to lower the chance of collisions? + TransactionContext tx = theInstance.newTransactionContext(); + tx.beginTransaction(); + try { + STxMap intentAllocs = getIntentAllocs(tx); + intentAllocs.remove(intendId); + + STxMap> linkAllocs = getLinkAllocs(tx); + + for (Link link : links) { + final LinkKey linkId = LinkKey.linkKey(link); + + List before = linkAllocs.get(linkId); + if (before == null || before.isEmpty()) { + // something is wrong, but it is already freed + log.warn("There was no resource left to release on {}", linkId); + continue; + } + List after = new ArrayList<>(before); + after.remove(allocations); + linkAllocs.replace(linkId, before, after); + } + + tx.commitTransaction(); + success = true; + } catch (TransactionException e) { + log.debug("Transaction failed, retrying"); + } catch (Exception e) { + log.error("Exception thrown during releaseResource {}", + allocations, e); + tx.rollbackTransaction(); + throw e; + } + } while (!success); + + // Issue events to force recompilation of intents. + final List releasedResources = + ImmutableList.of(allocations); + return new LinkResourceEvent( + LinkResourceEvent.Type.ADDITIONAL_RESOURCES_AVAILABLE, + releasedResources); + } + + @Override + public LinkResourceAllocations getAllocations(IntentId intentId) { + checkNotNull(intentId); + TransactionOptions opt = new TransactionOptions(); + // read-only and will never be commited, thus does not need durability + opt.setTransactionType(TransactionType.LOCAL); + TransactionContext tx = theInstance.newTransactionContext(opt); + tx.beginTransaction(); + try { + STxMap intentAllocs = getIntentAllocs(tx); + return intentAllocs.get(intentId); + } finally { + tx.rollbackTransaction(); + } + } + + @Override + public List getAllocations(Link link) { + checkNotNull(link); + final LinkKey key = LinkKey.linkKey(link); + + TransactionOptions opt = new TransactionOptions(); + // read-only and will never be commited, thus does not need durability + opt.setTransactionType(TransactionType.LOCAL); + TransactionContext tx = theInstance.newTransactionContext(opt); + tx.beginTransaction(); + List res = null; + try { + STxMap> linkAllocs = getLinkAllocs(tx); + res = linkAllocs.get(key); + } finally { + tx.rollbackTransaction(); + } + + if (res == null) { + // try to add empty list + TransactionContext tx2 = theInstance.newTransactionContext(); + tx2.beginTransaction(); + try { + res = getLinkAllocs(tx2).putIfAbsent(key, new ArrayList<>()); + tx2.commitTransaction(); + if (res == null) { + return Collections.emptyList(); + } else { + return res; + } + } catch (TransactionException e) { + // concurrently added? + return getAllocations(link); + } catch (Exception e) { + tx.rollbackTransaction(); + } + } + return res; + + } + + @Override + public Iterable getAllocations() { + TransactionContext tx = theInstance.newTransactionContext(); + tx.beginTransaction(); + try { + STxMap intentAllocs = getIntentAllocs(tx); + return intentAllocs.values(); + } finally { + tx.rollbackTransaction(); + } + } +} diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/resource/impl/HazelcastLinkResourceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/resource/impl/HazelcastLinkResourceStoreTest.java new file mode 100644 index 0000000000..b6c63a7efa --- /dev/null +++ b/core/store/dist/src/test/java/org/onlab/onos/store/resource/impl/HazelcastLinkResourceStoreTest.java @@ -0,0 +1,194 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.onos.store.resource.impl; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onlab.onos.net.AnnotationKeys; +import org.onlab.onos.net.Annotations; +import org.onlab.onos.net.ConnectPoint; +import org.onlab.onos.net.DefaultAnnotations; +import org.onlab.onos.net.DefaultLink; +import org.onlab.onos.net.Link; +import org.onlab.onos.net.provider.ProviderId; +import org.onlab.onos.net.resource.Bandwidth; +import org.onlab.onos.net.resource.BandwidthResourceAllocation; +import org.onlab.onos.net.resource.LambdaResourceAllocation; +import org.onlab.onos.net.resource.LinkResourceAllocations; +import org.onlab.onos.net.resource.LinkResourceStore; +import org.onlab.onos.net.resource.ResourceAllocation; +import org.onlab.onos.net.resource.ResourceType; +import org.onlab.onos.store.hz.StoreService; +import org.onlab.onos.store.hz.TestStoreManager; + +import com.hazelcast.config.Config; +import com.hazelcast.core.Hazelcast; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.onlab.onos.net.DeviceId.deviceId; +import static org.onlab.onos.net.Link.Type.DIRECT; +import static org.onlab.onos.net.PortNumber.portNumber; + +/** + * Test of the simple LinkResourceStore implementation. + */ +public class HazelcastLinkResourceStoreTest { + + private LinkResourceStore store; + private HazelcastLinkResourceStore storeImpl; + private Link link1; + private Link link2; + private Link link3; + private TestStoreManager storeMgr; + + /** + * Returns {@link Link} object. + * + * @param dev1 source device + * @param port1 source port + * @param dev2 destination device + * @param port2 destination port + * @return created {@link Link} object + */ + private Link newLink(String dev1, int port1, String dev2, int port2) { + Annotations annotations = DefaultAnnotations.builder() + .set(AnnotationKeys.OPTICAL_WAVES, "80") + .set(AnnotationKeys.BANDWIDTH, "1000000") + .build(); + return new DefaultLink( + new ProviderId("of", "foo"), + new ConnectPoint(deviceId(dev1), portNumber(port1)), + new ConnectPoint(deviceId(dev2), portNumber(port2)), + DIRECT, annotations); + } + + @Before + public void setUp() throws Exception { + + Config config = TestStoreManager.getTestConfig(); + + storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); + storeMgr.activate(); + + + storeImpl = new TestHazelcastLinkResourceStore(storeMgr); + storeImpl.activate(); + store = storeImpl; + + link1 = newLink("of:1", 1, "of:2", 2); + link2 = newLink("of:2", 1, "of:3", 2); + link3 = newLink("of:3", 1, "of:4", 2); + } + + @After + public void tearDown() throws Exception { + storeImpl.deactivate(); + + storeMgr.deactivate(); + } + + /** + * Tests constructor and activate method. + */ + @Test + public void testConstructorAndActivate() { + final Iterable allAllocations = store.getAllocations(); + assertNotNull(allAllocations); + assertFalse(allAllocations.iterator().hasNext()); + + final Iterable linkAllocations = + store.getAllocations(link1); + assertNotNull(linkAllocations); + assertFalse(linkAllocations.iterator().hasNext()); + + final Set res = store.getFreeResources(link2); + assertNotNull(res); + } + + /** + * Picks up and returns one of bandwidth allocations from a given set. + * + * @param resources the set of {@link ResourceAllocation}s + * @return {@link BandwidthResourceAllocation} object if found, null + * otherwise + */ + private BandwidthResourceAllocation getBandwidthObj(Set resources) { + for (ResourceAllocation res : resources) { + if (res.type() == ResourceType.BANDWIDTH) { + return ((BandwidthResourceAllocation) res); + } + } + return null; + } + + /** + * Returns all lambda allocations from a given set. + * + * @param resources the set of {@link ResourceAllocation}s + * @return a set of {@link LambdaResourceAllocation} objects + */ + private Set getLambdaObjs(Set resources) { + Set lambdaResources = new HashSet<>(); + for (ResourceAllocation res : resources) { + if (res.type() == ResourceType.LAMBDA) { + lambdaResources.add((LambdaResourceAllocation) res); + } + } + return lambdaResources; + } + + /** + * Tests initial free bandwidth for a link. + */ + @Test + public void testInitialBandwidth() { + final Set freeRes = store.getFreeResources(link1); + assertNotNull(freeRes); + + final BandwidthResourceAllocation alloc = getBandwidthObj(freeRes); + assertNotNull(alloc); + + assertEquals(Bandwidth.valueOf(1000000.0), alloc.bandwidth()); + } + + /** + * Tests initial free lambda for a link. + */ + @Test + public void testInitialLambdas() { + final Set freeRes = store.getFreeResources(link3); + assertNotNull(freeRes); + + final Set res = getLambdaObjs(freeRes); + assertNotNull(res); + assertEquals(80, res.size()); + } + + public static final class TestHazelcastLinkResourceStore + extends HazelcastLinkResourceStore { + + public TestHazelcastLinkResourceStore(StoreService storeMgr) { + super.storeService = storeMgr; + } + + } +}