mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-10-24 13:51:27 +02:00
Add probes to DistributedIntentStore
Change-Id: I23a5823d3924392dc17166404a17fc1918c01453
This commit is contained in:
parent
a22f69f28a
commit
e367fb9718
@ -15,6 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.onlab.onos.store.intent.impl;
|
package org.onlab.onos.store.intent.impl;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Timer;
|
||||||
|
import com.codahale.metrics.Timer.Context;
|
||||||
import com.google.common.base.Verify;
|
import com.google.common.base.Verify;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
|
||||||
@ -24,6 +26,8 @@ import org.apache.felix.scr.annotations.Deactivate;
|
|||||||
import org.apache.felix.scr.annotations.Reference;
|
import org.apache.felix.scr.annotations.Reference;
|
||||||
import org.apache.felix.scr.annotations.ReferenceCardinality;
|
import org.apache.felix.scr.annotations.ReferenceCardinality;
|
||||||
import org.apache.felix.scr.annotations.Service;
|
import org.apache.felix.scr.annotations.Service;
|
||||||
|
import org.onlab.metrics.MetricsService;
|
||||||
|
import org.onlab.onos.core.MetricsHelper;
|
||||||
import org.onlab.onos.net.intent.Intent;
|
import org.onlab.onos.net.intent.Intent;
|
||||||
import org.onlab.onos.net.intent.IntentEvent;
|
import org.onlab.onos.net.intent.IntentEvent;
|
||||||
import org.onlab.onos.net.intent.IntentId;
|
import org.onlab.onos.net.intent.IntentId;
|
||||||
@ -48,12 +52,13 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
|
|
||||||
import static org.onlab.onos.net.intent.IntentState.*;
|
import static org.onlab.onos.net.intent.IntentState.*;
|
||||||
import static org.slf4j.LoggerFactory.getLogger;
|
import static org.slf4j.LoggerFactory.getLogger;
|
||||||
|
import static org.onlab.metrics.MetricsUtil.*;
|
||||||
|
|
||||||
@Component(immediate = true, enabled = true)
|
@Component(immediate = true, enabled = true)
|
||||||
@Service
|
@Service
|
||||||
public class DistributedIntentStore
|
public class DistributedIntentStore
|
||||||
extends AbstractStore<IntentEvent, IntentStoreDelegate>
|
extends AbstractStore<IntentEvent, IntentStoreDelegate>
|
||||||
implements IntentStore {
|
implements IntentStore, MetricsHelper {
|
||||||
|
|
||||||
/** Valid parking state, which can transition to INSTALLED. */
|
/** Valid parking state, which can transition to INSTALLED. */
|
||||||
private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(SUBMITTED, INSTALLED, FAILED);
|
private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(SUBMITTED, INSTALLED, FAILED);
|
||||||
@ -81,11 +86,41 @@ public class DistributedIntentStore
|
|||||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||||
protected DatabaseService dbService;
|
protected DatabaseService dbService;
|
||||||
|
|
||||||
|
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||||
|
protected MetricsService metricsService;
|
||||||
|
|
||||||
// TODO make this configurable
|
// TODO make this configurable
|
||||||
private boolean onlyLogTransitionError = true;
|
private boolean onlyLogTransitionError = true;
|
||||||
|
|
||||||
|
private Timer createIntentTimer;
|
||||||
|
private Timer removeIntentTimer;
|
||||||
|
private Timer setInstallableIntentsTimer;
|
||||||
|
private Timer getInstallableIntentsTimer;
|
||||||
|
private Timer removeInstalledIntentsTimer;
|
||||||
|
private Timer setStateTimer;
|
||||||
|
private Timer getIntentCountTimer;
|
||||||
|
private Timer getIntentsTimer;
|
||||||
|
private Timer getIntentTimer;
|
||||||
|
private Timer getIntentStateTimer;
|
||||||
|
|
||||||
|
|
||||||
|
private Timer createResponseTimer(String methodName) {
|
||||||
|
return createTimer("IntentStore", methodName, "responseTime");
|
||||||
|
}
|
||||||
|
|
||||||
@Activate
|
@Activate
|
||||||
public void activate() {
|
public void activate() {
|
||||||
|
createIntentTimer = createResponseTimer("createIntent");
|
||||||
|
removeIntentTimer = createResponseTimer("removeIntent");
|
||||||
|
setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
|
||||||
|
getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
|
||||||
|
removeInstalledIntentsTimer = createResponseTimer("removeInstalledIntents");
|
||||||
|
setStateTimer = createResponseTimer("setState");
|
||||||
|
getIntentCountTimer = createResponseTimer("getIntentCount");
|
||||||
|
getIntentsTimer = createResponseTimer("getIntents");
|
||||||
|
getIntentTimer = createResponseTimer("getIntent");
|
||||||
|
getIntentStateTimer = createResponseTimer("getIntentState");
|
||||||
|
|
||||||
// FIXME: We need a way to add serializer for intents which has been plugged-in.
|
// FIXME: We need a way to add serializer for intents which has been plugged-in.
|
||||||
// As a short term workaround, relax Kryo config to
|
// As a short term workaround, relax Kryo config to
|
||||||
// registrationRequired=false
|
// registrationRequired=false
|
||||||
@ -117,56 +152,92 @@ public class DistributedIntentStore
|
|||||||
log.info("Stopped");
|
log.info("Stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MetricsService metricsService() {
|
||||||
|
return metricsService;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IntentEvent createIntent(Intent intent) {
|
public IntentEvent createIntent(Intent intent) {
|
||||||
boolean absent = intents.putIfAbsent(intent.id(), intent);
|
Context timer = startTimer(createIntentTimer);
|
||||||
if (!absent) {
|
try {
|
||||||
// duplicate, ignore
|
boolean absent = intents.putIfAbsent(intent.id(), intent);
|
||||||
return null;
|
if (!absent) {
|
||||||
} else {
|
// duplicate, ignore
|
||||||
return this.setState(intent, IntentState.SUBMITTED);
|
return null;
|
||||||
|
} else {
|
||||||
|
return this.setState(intent, IntentState.SUBMITTED);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IntentEvent removeIntent(IntentId intentId) {
|
public IntentEvent removeIntent(IntentId intentId) {
|
||||||
Intent intent = intents.remove(intentId);
|
Context timer = startTimer(removeIntentTimer);
|
||||||
installable.remove(intentId);
|
try {
|
||||||
if (intent == null) {
|
Intent intent = intents.remove(intentId);
|
||||||
// was already removed
|
installable.remove(intentId);
|
||||||
return null;
|
if (intent == null) {
|
||||||
|
// was already removed
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
IntentEvent event = this.setState(intent, WITHDRAWN);
|
||||||
|
states.remove(intentId);
|
||||||
|
transientStates.remove(intentId);
|
||||||
|
// TODO: Should we callremoveInstalledIntents if this Intent was
|
||||||
|
return event;
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
}
|
}
|
||||||
IntentEvent event = this.setState(intent, WITHDRAWN);
|
|
||||||
states.remove(intentId);
|
|
||||||
transientStates.remove(intentId);
|
|
||||||
// TODO: Should we callremoveInstalledIntents if this Intent was
|
|
||||||
return event;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getIntentCount() {
|
public long getIntentCount() {
|
||||||
return intents.size();
|
Context timer = startTimer(getIntentCountTimer);
|
||||||
|
try {
|
||||||
|
return intents.size();
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterable<Intent> getIntents() {
|
public Iterable<Intent> getIntents() {
|
||||||
return ImmutableSet.copyOf(intents.values());
|
Context timer = startTimer(getIntentsTimer);
|
||||||
|
try {
|
||||||
|
return ImmutableSet.copyOf(intents.values());
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Intent getIntent(IntentId intentId) {
|
public Intent getIntent(IntentId intentId) {
|
||||||
return intents.get(intentId);
|
Context timer = startTimer(getIntentTimer);
|
||||||
|
try {
|
||||||
|
return intents.get(intentId);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IntentState getIntentState(IntentId id) {
|
public IntentState getIntentState(IntentId id) {
|
||||||
final IntentState localState = transientStates.get(id);
|
Context timer = startTimer(getIntentStateTimer);
|
||||||
if (localState != null) {
|
try {
|
||||||
return localState;
|
final IntentState localState = transientStates.get(id);
|
||||||
|
if (localState != null) {
|
||||||
|
return localState;
|
||||||
|
}
|
||||||
|
return states.get(id);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
}
|
}
|
||||||
return states.get(id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME temporary workaround until we fix our state machine
|
||||||
private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
|
private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
|
||||||
if (onlyLogTransitionError) {
|
if (onlyLogTransitionError) {
|
||||||
if (!expression) {
|
if (!expression) {
|
||||||
@ -179,89 +250,109 @@ public class DistributedIntentStore
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IntentEvent setState(Intent intent, IntentState state) {
|
public IntentEvent setState(Intent intent, IntentState state) {
|
||||||
final IntentId id = intent.id();
|
Context timer = startTimer(setStateTimer);
|
||||||
IntentEvent.Type evtType = null;
|
try {
|
||||||
final IntentState prevParking;
|
final IntentId id = intent.id();
|
||||||
boolean transitionedToParking = true;
|
IntentEvent.Type evtType = null;
|
||||||
boolean updated;
|
final IntentState prevParking;
|
||||||
|
boolean transitionedToParking = true;
|
||||||
|
boolean updated;
|
||||||
|
|
||||||
// parking state transition
|
// parking state transition
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case SUBMITTED:
|
case SUBMITTED:
|
||||||
prevParking = states.get(id);
|
prevParking = states.get(id);
|
||||||
if (prevParking == null) {
|
if (prevParking == null) {
|
||||||
updated = states.putIfAbsent(id, SUBMITTED);
|
updated = states.putIfAbsent(id, SUBMITTED);
|
||||||
verify(updated, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
|
verify(updated, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
|
||||||
} else {
|
} else {
|
||||||
verify(prevParking == WITHDRAWN,
|
verify(prevParking == WITHDRAWN,
|
||||||
"Illegal state transition attempted from %s to SUBMITTED",
|
"Illegal state transition attempted from %s to SUBMITTED",
|
||||||
prevParking);
|
prevParking);
|
||||||
updated = states.replace(id, prevParking, SUBMITTED);
|
updated = states.replace(id, prevParking, SUBMITTED);
|
||||||
verify(updated, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
|
verify(updated, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
|
||||||
|
}
|
||||||
|
evtType = IntentEvent.Type.SUBMITTED;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case INSTALLED:
|
||||||
|
prevParking = states.get(id);
|
||||||
|
verify(PRE_INSTALLED.contains(prevParking),
|
||||||
|
"Illegal state transition attempted from %s to INSTALLED",
|
||||||
|
prevParking);
|
||||||
|
updated = states.replace(id, prevParking, INSTALLED);
|
||||||
|
verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALLED);
|
||||||
|
evtType = IntentEvent.Type.INSTALLED;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case FAILED:
|
||||||
|
prevParking = states.get(id);
|
||||||
|
updated = states.replace(id, prevParking, FAILED);
|
||||||
|
verify(updated, "Conditional replace %s => %s failed", prevParking, FAILED);
|
||||||
|
evtType = IntentEvent.Type.FAILED;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case WITHDRAWN:
|
||||||
|
prevParking = states.get(id);
|
||||||
|
verify(PRE_WITHDRAWN.contains(prevParking),
|
||||||
|
"Illegal state transition attempted from %s to WITHDRAWN",
|
||||||
|
prevParking);
|
||||||
|
updated = states.replace(id, prevParking, WITHDRAWN);
|
||||||
|
verify(updated, "Conditional replace %s => %s failed", prevParking, WITHDRAWN);
|
||||||
|
evtType = IntentEvent.Type.WITHDRAWN;
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
transitionedToParking = false;
|
||||||
|
prevParking = null;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (transitionedToParking) {
|
||||||
|
log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
|
||||||
|
// remove instance local state
|
||||||
|
transientStates.remove(id);
|
||||||
|
} else {
|
||||||
|
// Update instance local state, which includes non-parking state transition
|
||||||
|
final IntentState prevTransient = transientStates.put(id, state);
|
||||||
|
log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
|
||||||
}
|
}
|
||||||
evtType = IntentEvent.Type.SUBMITTED;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case INSTALLED:
|
if (evtType == null) {
|
||||||
prevParking = states.get(id);
|
return null;
|
||||||
verify(PRE_INSTALLED.contains(prevParking),
|
}
|
||||||
"Illegal state transition attempted from %s to INSTALLED",
|
return new IntentEvent(evtType, intent);
|
||||||
prevParking);
|
} finally {
|
||||||
updated = states.replace(id, prevParking, INSTALLED);
|
stopTimer(timer);
|
||||||
verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALLED);
|
|
||||||
evtType = IntentEvent.Type.INSTALLED;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case FAILED:
|
|
||||||
prevParking = states.get(id);
|
|
||||||
updated = states.replace(id, prevParking, FAILED);
|
|
||||||
verify(updated, "Conditional replace %s => %s failed", prevParking, FAILED);
|
|
||||||
evtType = IntentEvent.Type.FAILED;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case WITHDRAWN:
|
|
||||||
prevParking = states.get(id);
|
|
||||||
verify(PRE_WITHDRAWN.contains(prevParking),
|
|
||||||
"Illegal state transition attempted from %s to WITHDRAWN",
|
|
||||||
prevParking);
|
|
||||||
updated = states.replace(id, prevParking, WITHDRAWN);
|
|
||||||
verify(updated, "Conditional replace %s => %s failed", prevParking, WITHDRAWN);
|
|
||||||
evtType = IntentEvent.Type.WITHDRAWN;
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
transitionedToParking = false;
|
|
||||||
prevParking = null;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
if (transitionedToParking) {
|
|
||||||
log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
|
|
||||||
// remove instance local state
|
|
||||||
transientStates.remove(id);
|
|
||||||
} else {
|
|
||||||
// Update instance local state, which includes non-parking state transition
|
|
||||||
final IntentState prevTransient = transientStates.put(id, state);
|
|
||||||
log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (evtType == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return new IntentEvent(evtType, intent);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInstallableIntents(IntentId intentId, List<Intent> result) {
|
public void setInstallableIntents(IntentId intentId, List<Intent> result) {
|
||||||
installable.put(intentId, result);
|
Context timer = startTimer(setInstallableIntentsTimer);
|
||||||
|
try {
|
||||||
|
installable.put(intentId, result);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Intent> getInstallableIntents(IntentId intentId) {
|
public List<Intent> getInstallableIntents(IntentId intentId) {
|
||||||
return installable.get(intentId);
|
Context timer = startTimer(getInstallableIntentsTimer);
|
||||||
|
try {
|
||||||
|
return installable.get(intentId);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeInstalledIntents(IntentId intentId) {
|
public void removeInstalledIntents(IntentId intentId) {
|
||||||
installable.remove(intentId);
|
Context timer = startTimer(removeInstalledIntentsTimer);
|
||||||
|
try {
|
||||||
|
installable.remove(intentId);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.onlab.onos.store.intent.impl;
|
package org.onlab.onos.store.intent.impl;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Timer;
|
||||||
|
import com.codahale.metrics.Timer.Context;
|
||||||
import com.google.common.base.Verify;
|
import com.google.common.base.Verify;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.hazelcast.core.EntryAdapter;
|
import com.hazelcast.core.EntryAdapter;
|
||||||
@ -26,7 +28,11 @@ import com.hazelcast.core.Member;
|
|||||||
import org.apache.felix.scr.annotations.Activate;
|
import org.apache.felix.scr.annotations.Activate;
|
||||||
import org.apache.felix.scr.annotations.Component;
|
import org.apache.felix.scr.annotations.Component;
|
||||||
import org.apache.felix.scr.annotations.Deactivate;
|
import org.apache.felix.scr.annotations.Deactivate;
|
||||||
|
import org.apache.felix.scr.annotations.Reference;
|
||||||
|
import org.apache.felix.scr.annotations.ReferenceCardinality;
|
||||||
import org.apache.felix.scr.annotations.Service;
|
import org.apache.felix.scr.annotations.Service;
|
||||||
|
import org.onlab.metrics.MetricsService;
|
||||||
|
import org.onlab.onos.core.MetricsHelper;
|
||||||
import org.onlab.onos.net.intent.Intent;
|
import org.onlab.onos.net.intent.Intent;
|
||||||
import org.onlab.onos.net.intent.IntentEvent;
|
import org.onlab.onos.net.intent.IntentEvent;
|
||||||
import org.onlab.onos.net.intent.IntentId;
|
import org.onlab.onos.net.intent.IntentId;
|
||||||
@ -48,12 +54,13 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
|
|
||||||
import static org.onlab.onos.net.intent.IntentState.*;
|
import static org.onlab.onos.net.intent.IntentState.*;
|
||||||
import static org.slf4j.LoggerFactory.getLogger;
|
import static org.slf4j.LoggerFactory.getLogger;
|
||||||
|
import static org.onlab.metrics.MetricsUtil.*;
|
||||||
|
|
||||||
@Component(immediate = true, enabled = false)
|
@Component(immediate = true, enabled = false)
|
||||||
@Service
|
@Service
|
||||||
public class HazelcastIntentStore
|
public class HazelcastIntentStore
|
||||||
extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
|
extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
|
||||||
implements IntentStore {
|
implements IntentStore, MetricsHelper {
|
||||||
|
|
||||||
/** Valid parking state, which can transition to INSTALLED. */
|
/** Valid parking state, which can transition to INSTALLED. */
|
||||||
private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(SUBMITTED, INSTALLED, FAILED);
|
private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(SUBMITTED, INSTALLED, FAILED);
|
||||||
@ -72,12 +79,41 @@ public class HazelcastIntentStore
|
|||||||
|
|
||||||
private SMap<IntentId, List<Intent>> installable;
|
private SMap<IntentId, List<Intent>> installable;
|
||||||
|
|
||||||
|
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||||
|
protected MetricsService metricsService;
|
||||||
|
|
||||||
// TODO make this configurable
|
// TODO make this configurable
|
||||||
private boolean onlyLogTransitionError = true;
|
private boolean onlyLogTransitionError = true;
|
||||||
|
|
||||||
|
private Timer createIntentTimer;
|
||||||
|
private Timer removeIntentTimer;
|
||||||
|
private Timer setInstallableIntentsTimer;
|
||||||
|
private Timer getInstallableIntentsTimer;
|
||||||
|
private Timer removeInstalledIntentsTimer;
|
||||||
|
private Timer setStateTimer;
|
||||||
|
private Timer getIntentCountTimer;
|
||||||
|
private Timer getIntentsTimer;
|
||||||
|
private Timer getIntentTimer;
|
||||||
|
private Timer getIntentStateTimer;
|
||||||
|
|
||||||
|
private Timer createResponseTimer(String methodName) {
|
||||||
|
return createTimer("IntentStore", methodName, "responseTime");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Activate
|
@Activate
|
||||||
public void activate() {
|
public void activate() {
|
||||||
|
createIntentTimer = createResponseTimer("createIntent");
|
||||||
|
removeIntentTimer = createResponseTimer("removeIntent");
|
||||||
|
setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
|
||||||
|
getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
|
||||||
|
removeInstalledIntentsTimer = createResponseTimer("removeInstalledIntents");
|
||||||
|
setStateTimer = createResponseTimer("setState");
|
||||||
|
getIntentCountTimer = createResponseTimer("getIntentCount");
|
||||||
|
getIntentsTimer = createResponseTimer("getIntents");
|
||||||
|
getIntentTimer = createResponseTimer("getIntent");
|
||||||
|
getIntentStateTimer = createResponseTimer("getIntentState");
|
||||||
|
|
||||||
// FIXME: We need a way to add serializer for intents which has been plugged-in.
|
// FIXME: We need a way to add serializer for intents which has been plugged-in.
|
||||||
// As a short term workaround, relax Kryo config to
|
// As a short term workaround, relax Kryo config to
|
||||||
// registrationRequired=false
|
// registrationRequired=false
|
||||||
@ -119,54 +155,89 @@ public class HazelcastIntentStore
|
|||||||
log.info("Stopped");
|
log.info("Stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MetricsService metricsService() {
|
||||||
|
return metricsService;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IntentEvent createIntent(Intent intent) {
|
public IntentEvent createIntent(Intent intent) {
|
||||||
Intent existing = intents.putIfAbsent(intent.id(), intent);
|
Context timer = startTimer(createIntentTimer);
|
||||||
if (existing != null) {
|
try {
|
||||||
// duplicate, ignore
|
Intent existing = intents.putIfAbsent(intent.id(), intent);
|
||||||
return null;
|
if (existing != null) {
|
||||||
} else {
|
// duplicate, ignore
|
||||||
return this.setState(intent, IntentState.SUBMITTED);
|
return null;
|
||||||
|
} else {
|
||||||
|
return this.setState(intent, IntentState.SUBMITTED);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IntentEvent removeIntent(IntentId intentId) {
|
public IntentEvent removeIntent(IntentId intentId) {
|
||||||
Intent intent = intents.remove(intentId);
|
Context timer = startTimer(removeIntentTimer);
|
||||||
installable.remove(intentId);
|
try {
|
||||||
if (intent == null) {
|
Intent intent = intents.remove(intentId);
|
||||||
// was already removed
|
installable.remove(intentId);
|
||||||
return null;
|
if (intent == null) {
|
||||||
|
// was already removed
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
IntentEvent event = this.setState(intent, WITHDRAWN);
|
||||||
|
states.remove(intentId);
|
||||||
|
transientStates.remove(intentId);
|
||||||
|
// TODO: Should we callremoveInstalledIntents if this Intent was
|
||||||
|
return event;
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
}
|
}
|
||||||
IntentEvent event = this.setState(intent, WITHDRAWN);
|
|
||||||
states.remove(intentId);
|
|
||||||
transientStates.remove(intentId);
|
|
||||||
// TODO: Should we callremoveInstalledIntents if this Intent was
|
|
||||||
return event;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getIntentCount() {
|
public long getIntentCount() {
|
||||||
return intents.size();
|
Context timer = startTimer(getIntentCountTimer);
|
||||||
|
try {
|
||||||
|
return intents.size();
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterable<Intent> getIntents() {
|
public Iterable<Intent> getIntents() {
|
||||||
return ImmutableSet.copyOf(intents.values());
|
Context timer = startTimer(getIntentsTimer);
|
||||||
|
try {
|
||||||
|
return ImmutableSet.copyOf(intents.values());
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Intent getIntent(IntentId intentId) {
|
public Intent getIntent(IntentId intentId) {
|
||||||
return intents.get(intentId);
|
Context timer = startTimer(getIntentTimer);
|
||||||
|
try {
|
||||||
|
return intents.get(intentId);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IntentState getIntentState(IntentId id) {
|
public IntentState getIntentState(IntentId id) {
|
||||||
final IntentState localState = transientStates.get(id);
|
Context timer = startTimer(getIntentStateTimer);
|
||||||
if (localState != null) {
|
try {
|
||||||
return localState;
|
final IntentState localState = transientStates.get(id);
|
||||||
|
if (localState != null) {
|
||||||
|
return localState;
|
||||||
|
}
|
||||||
|
return states.get(id);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
}
|
}
|
||||||
return states.get(id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
|
private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
|
||||||
@ -181,76 +252,97 @@ public class HazelcastIntentStore
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IntentEvent setState(Intent intent, IntentState state) {
|
public IntentEvent setState(Intent intent, IntentState state) {
|
||||||
final IntentId id = intent.id();
|
Context timer = startTimer(setStateTimer);
|
||||||
IntentEvent.Type type = null;
|
try {
|
||||||
final IntentState prevParking;
|
|
||||||
boolean transientStateChangeOnly = false;
|
|
||||||
|
|
||||||
// parking state transition
|
final IntentId id = intent.id();
|
||||||
switch (state) {
|
IntentEvent.Type type = null;
|
||||||
case SUBMITTED:
|
final IntentState prevParking;
|
||||||
prevParking = states.get(id);
|
boolean transientStateChangeOnly = false;
|
||||||
if (prevParking == null) {
|
|
||||||
IntentState existing = states.putIfAbsent(id, SUBMITTED);
|
// parking state transition
|
||||||
verify(existing == null, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
|
switch (state) {
|
||||||
} else {
|
case SUBMITTED:
|
||||||
verify(prevParking == WITHDRAWN,
|
prevParking = states.get(id);
|
||||||
"Illegal state transition attempted from %s to SUBMITTED",
|
if (prevParking == null) {
|
||||||
prevParking);
|
IntentState existing = states.putIfAbsent(id, SUBMITTED);
|
||||||
boolean updated = states.replace(id, prevParking, SUBMITTED);
|
verify(existing == null, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
|
||||||
verify(updated, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
|
} else {
|
||||||
|
verify(prevParking == WITHDRAWN,
|
||||||
|
"Illegal state transition attempted from %s to SUBMITTED",
|
||||||
|
prevParking);
|
||||||
|
boolean updated = states.replace(id, prevParking, SUBMITTED);
|
||||||
|
verify(updated, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
|
||||||
|
}
|
||||||
|
type = IntentEvent.Type.SUBMITTED;
|
||||||
|
break;
|
||||||
|
case INSTALLED:
|
||||||
|
prevParking = states.replace(id, INSTALLED);
|
||||||
|
verify(PRE_INSTALLED.contains(prevParking),
|
||||||
|
"Illegal state transition attempted from %s to INSTALLED",
|
||||||
|
prevParking);
|
||||||
|
type = IntentEvent.Type.INSTALLED;
|
||||||
|
break;
|
||||||
|
case FAILED:
|
||||||
|
prevParking = states.replace(id, FAILED);
|
||||||
|
type = IntentEvent.Type.FAILED;
|
||||||
|
break;
|
||||||
|
case WITHDRAWN:
|
||||||
|
prevParking = states.replace(id, WITHDRAWN);
|
||||||
|
verify(PRE_WITHDRAWN.contains(prevParking),
|
||||||
|
"Illegal state transition attempted from %s to WITHDRAWN",
|
||||||
|
prevParking);
|
||||||
|
type = IntentEvent.Type.WITHDRAWN;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
transientStateChangeOnly = true;
|
||||||
|
prevParking = null;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
type = IntentEvent.Type.SUBMITTED;
|
if (!transientStateChangeOnly) {
|
||||||
break;
|
log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
|
||||||
case INSTALLED:
|
}
|
||||||
prevParking = states.replace(id, INSTALLED);
|
// Update instance local state, which includes non-parking state transition
|
||||||
verify(PRE_INSTALLED.contains(prevParking),
|
final IntentState prevTransient = transientStates.put(id, state);
|
||||||
"Illegal state transition attempted from %s to INSTALLED",
|
log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
|
||||||
prevParking);
|
|
||||||
type = IntentEvent.Type.INSTALLED;
|
|
||||||
break;
|
|
||||||
case FAILED:
|
|
||||||
prevParking = states.replace(id, FAILED);
|
|
||||||
type = IntentEvent.Type.FAILED;
|
|
||||||
break;
|
|
||||||
case WITHDRAWN:
|
|
||||||
prevParking = states.replace(id, WITHDRAWN);
|
|
||||||
verify(PRE_WITHDRAWN.contains(prevParking),
|
|
||||||
"Illegal state transition attempted from %s to WITHDRAWN",
|
|
||||||
prevParking);
|
|
||||||
type = IntentEvent.Type.WITHDRAWN;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
transientStateChangeOnly = true;
|
|
||||||
prevParking = null;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (!transientStateChangeOnly) {
|
|
||||||
log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
|
|
||||||
}
|
|
||||||
// Update instance local state, which includes non-parking state transition
|
|
||||||
final IntentState prevTransient = transientStates.put(id, state);
|
|
||||||
log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
|
|
||||||
|
|
||||||
if (type == null) {
|
if (type == null) {
|
||||||
return null;
|
return null;
|
||||||
|
}
|
||||||
|
return new IntentEvent(type, intent);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
}
|
}
|
||||||
return new IntentEvent(type, intent);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInstallableIntents(IntentId intentId, List<Intent> result) {
|
public void setInstallableIntents(IntentId intentId, List<Intent> result) {
|
||||||
installable.put(intentId, result);
|
Context timer = startTimer(setInstallableIntentsTimer);
|
||||||
|
try {
|
||||||
|
installable.put(intentId, result);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Intent> getInstallableIntents(IntentId intentId) {
|
public List<Intent> getInstallableIntents(IntentId intentId) {
|
||||||
return installable.get(intentId);
|
Context timer = startTimer(getInstallableIntentsTimer);
|
||||||
|
try {
|
||||||
|
return installable.get(intentId);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeInstalledIntents(IntentId intentId) {
|
public void removeInstalledIntents(IntentId intentId) {
|
||||||
installable.remove(intentId);
|
Context timer = startTimer(removeInstalledIntentsTimer);
|
||||||
|
try {
|
||||||
|
installable.remove(intentId);
|
||||||
|
} finally {
|
||||||
|
stopTimer(timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
|
public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user