initial Distributed IntentStore using Hz

Change-Id: Iffb3f5fdfe8ba080fd039e67f8473ea18348f20d
This commit is contained in:
Yuta HIGUCHI 2014-10-28 14:42:06 -07:00
parent f5d90939a7
commit 10a31c3b85
4 changed files with 80 additions and 16 deletions

View File

@ -86,13 +86,13 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
IntentEvent setState(Intent intent, IntentState newState); IntentEvent setState(Intent intent, IntentState newState);
/** /**
* Adds the installable intents which resulted from compilation of the * Sets the installable intents which resulted from compilation of the
* specified original intent. * specified original intent.
* *
* @param intentId original intent identifier * @param intentId original intent identifier
* @param installableIntents compiled installable intents * @param installableIntents compiled installable intents
*/ */
void addInstallableIntents(IntentId intentId, List<Intent> installableIntents); void setInstallableIntents(IntentId intentId, List<Intent> installableIntents);
/** /**
* Returns the list of the installable events associated with the specified * Returns the list of the installable events associated with the specified

View File

@ -273,7 +273,7 @@ public class IntentManager
// If all went well, associate the resulting list of installable // If all went well, associate the resulting list of installable
// intents with the top-level intent and proceed to install. // intents with the top-level intent and proceed to install.
store.addInstallableIntents(intent.id(), installable); store.setInstallableIntents(intent.id(), installable);
executeInstallingPhase(intent); executeInstallingPhase(intent);
} catch (Exception e) { } catch (Exception e) {
@ -366,7 +366,7 @@ public class IntentManager
} else { } else {
// Otherwise, re-associate the newly compiled installable intents // Otherwise, re-associate the newly compiled installable intents
// with the top-level intent and kick off installing phase. // with the top-level intent and kick off installing phase.
store.addInstallableIntents(intent.id(), installable); store.setInstallableIntents(intent.id(), installable);
executeInstallingPhase(intent); executeInstallingPhase(intent);
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -16,6 +16,8 @@
package org.onlab.onos.store.intent.impl; package org.onlab.onos.store.intent.impl;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.hazelcast.core.IMap;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Deactivate;
@ -26,30 +28,54 @@ import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentState; import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.IntentStore; import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStoreDelegate; import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.onlab.onos.store.AbstractStore; import org.onlab.onos.store.hz.AbstractHazelcastStore;
import org.onlab.onos.store.hz.SMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Verify.verify;
import static org.onlab.onos.net.intent.IntentState.*; import static org.onlab.onos.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true) @Component(immediate = true)
@Service @Service
public class DistributedIntentStore public class DistributedIntentStore
extends AbstractStore<IntentEvent, IntentStoreDelegate> extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
implements IntentStore { implements IntentStore {
private final Logger log = getLogger(getClass()); private final Logger log = getLogger(getClass());
private final Map<IntentId, Intent> intents = new ConcurrentHashMap<>();
private final Map<IntentId, IntentState> states = new ConcurrentHashMap<>();
private final Map<IntentId, List<Intent>> installable = new ConcurrentHashMap<>();
// Assumption: IntentId will not have synonyms
private SMap<IntentId, Intent> intents;
private SMap<IntentId, IntentState> states;
// Map to store instance local intermediate state transition
private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
private SMap<IntentId, List<Intent>> installable;
@Override
@Activate @Activate
public void activate() { public void activate() {
super.activate();
// TODO: enable near cache, allow read from backup for this IMap
IMap<byte[], byte[]> rawIntents = super.theInstance.getMap("intents");
intents = new SMap<>(rawIntents , super.serializer);
// TODO: disable near cache, disable read from backup for this IMap
IMap<byte[], byte[]> rawStates = super.theInstance.getMap("intent-states");
states = new SMap<>(rawStates , super.serializer);
transientStates.clear();
// TODO: disable near cache, disable read from backup for this IMap
IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap("installable-intents");
installable = new SMap<>(rawInstallables , super.serializer);
log.info("Started"); log.info("Started");
} }
@ -60,16 +86,27 @@ public class DistributedIntentStore
@Override @Override
public IntentEvent createIntent(Intent intent) { public IntentEvent createIntent(Intent intent) {
intents.put(intent.id(), intent); Intent existing = intents.putIfAbsent(intent.id(), intent);
return this.setState(intent, IntentState.SUBMITTED); if (existing != null) {
// duplicate, ignore
return null;
} else {
return this.setState(intent, IntentState.SUBMITTED);
}
} }
@Override @Override
public IntentEvent removeIntent(IntentId intentId) { public IntentEvent removeIntent(IntentId intentId) {
Intent intent = intents.remove(intentId); Intent intent = intents.remove(intentId);
installable.remove(intentId); installable.remove(intentId);
if (intent == null) {
// was already removed
return null;
}
IntentEvent event = this.setState(intent, WITHDRAWN); IntentEvent event = this.setState(intent, WITHDRAWN);
states.remove(intentId); states.remove(intentId);
transientStates.remove(intentId);
// TODO: Should we callremoveInstalledIntents if this Intent was
return event; return event;
} }
@ -90,31 +127,53 @@ public class DistributedIntentStore
@Override @Override
public IntentState getIntentState(IntentId id) { public IntentState getIntentState(IntentId id) {
final IntentState localState = transientStates.get(id);
if (localState != null) {
return localState;
}
return states.get(id); return states.get(id);
} }
@Override @Override
public IntentEvent setState(Intent intent, IntentState state) { public IntentEvent setState(Intent intent, IntentState state) {
IntentId id = intent.id(); final IntentId id = intent.id();
states.put(id, state);
IntentEvent.Type type = null; IntentEvent.Type type = null;
IntentState prev = null;
boolean transientStateChangeOnly = false;
// TODO: enable sanity checking if Debug enabled, etc.
switch (state) { switch (state) {
case SUBMITTED: case SUBMITTED:
prev = states.putIfAbsent(id, SUBMITTED);
verify(prev == null, "Illegal state transition attempted from %s to SUBMITTED", prev);
type = IntentEvent.Type.SUBMITTED; type = IntentEvent.Type.SUBMITTED;
break; break;
case INSTALLED: case INSTALLED:
// parking state transition
prev = states.replace(id, INSTALLED);
verify(prev != null, "Illegal state transition attempted from non-SUBMITTED to INSTALLED");
type = IntentEvent.Type.INSTALLED; type = IntentEvent.Type.INSTALLED;
break; break;
case FAILED: case FAILED:
prev = states.replace(id, FAILED);
type = IntentEvent.Type.FAILED; type = IntentEvent.Type.FAILED;
break; break;
case WITHDRAWN: case WITHDRAWN:
prev = states.replace(id, WITHDRAWN);
verify(prev != null, "Illegal state transition attempted from non-WITHDRAWING to WITHDRAWN");
type = IntentEvent.Type.WITHDRAWN; type = IntentEvent.Type.WITHDRAWN;
break; break;
default: default:
transientStateChangeOnly = true;
break; break;
} }
if (!transientStateChangeOnly) {
log.debug("Parking State change: {} {}=>{}", id, prev, state);
}
// Update instance local state, which includes non-parking state transition
prev = transientStates.put(id, state);
log.debug("Transient State change: {} {}=>{}", id, prev, state);
if (type == null) { if (type == null) {
return null; return null;
} }
@ -122,7 +181,7 @@ public class DistributedIntentStore
} }
@Override @Override
public void addInstallableIntents(IntentId intentId, List<Intent> result) { public void setInstallableIntents(IntentId intentId, List<Intent> result) {
installable.put(intentId, result); installable.put(intentId, result);
} }
@ -136,4 +195,5 @@ public class DistributedIntentStore
installable.remove(intentId); installable.remove(intentId);
} }
// FIXME add handler to react to remote event
} }

View File

@ -68,6 +68,10 @@ public class SimpleIntentStore
public IntentEvent removeIntent(IntentId intentId) { public IntentEvent removeIntent(IntentId intentId) {
Intent intent = intents.remove(intentId); Intent intent = intents.remove(intentId);
installable.remove(intentId); installable.remove(intentId);
if (intent == null) {
// was already removed
return null;
}
IntentEvent event = this.setState(intent, WITHDRAWN); IntentEvent event = this.setState(intent, WITHDRAWN);
states.remove(intentId); states.remove(intentId);
return event; return event;
@ -122,7 +126,7 @@ public class SimpleIntentStore
} }
@Override @Override
public void addInstallableIntents(IntentId intentId, List<Intent> result) { public void setInstallableIntents(IntentId intentId, List<Intent> result) {
installable.put(intentId, result); installable.put(intentId, result);
} }