mirror of
https://github.com/opennetworkinglab/onos.git
synced 2026-05-05 12:16:13 +02:00
multi event support for a worklet in workflow app
Change-Id: I3178110da75b26f96f8889acc0dd2c715fc567ec
This commit is contained in:
parent
5e8a22a987
commit
f3f94c6468
@ -21,6 +21,7 @@ import org.onosproject.store.service.DocumentPath;
|
||||
import org.onosproject.store.service.Versioned;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* WorkflowContext Event Map Store.
|
||||
@ -30,36 +31,43 @@ public interface ContextEventMapStore {
|
||||
/**
|
||||
* Registers workflow context event mapping.
|
||||
* @param eventType the class name of event
|
||||
* @param eventHint event hint string value of the event
|
||||
* @param eventHintSet Set of event hint string value of the event
|
||||
* @param contextName workflow context name
|
||||
* @param programCounterString the program counter of workflow
|
||||
* @throws WorkflowException workflow exception
|
||||
*/
|
||||
void registerEventMap(String eventType, String eventHint,
|
||||
void registerEventMap(String eventType, Set<String> eventHintSet,
|
||||
String contextName, String programCounterString) throws WorkflowException;
|
||||
|
||||
/**
|
||||
* Unregisters workflow context event mapping.
|
||||
* @param eventType the class name of event
|
||||
* @param eventHint event hint string value of the event
|
||||
* @param contextName workflow context name
|
||||
* @throws WorkflowException workflow exception
|
||||
*/
|
||||
void unregisterEventMap(String eventType, String eventHint,
|
||||
void unregisterEventMap(String eventType,
|
||||
String contextName) throws WorkflowException;
|
||||
|
||||
/**
|
||||
* Returns workflow context event mapping.
|
||||
* @param eventType the class name of event
|
||||
* @param eventHint event hint string value of the event
|
||||
* @return workflow context event mapping
|
||||
* @param eventHint vent hint string value of the event
|
||||
* @return Map of workflow context and value (program counter)
|
||||
* @throws WorkflowException workflow exception
|
||||
*/
|
||||
Map<String, String> getEventMap(String eventType, String eventHint) throws WorkflowException;
|
||||
Map<String, String> getEventMapByHint(String eventType,
|
||||
String eventHint) throws WorkflowException;
|
||||
|
||||
/**
|
||||
* Returns true or false depending on existence of eventMap of given context.
|
||||
* @param contextName name of workflow context
|
||||
* @return Boolean true or false depending on existence of eventMap of given context
|
||||
*/
|
||||
boolean isEventMapPresent(String contextName);
|
||||
|
||||
/**
|
||||
* Returns child nodes on document tree path.
|
||||
* @param path document tree path
|
||||
* @param path document tree path including eventType and Hint
|
||||
* @return children under document tree path
|
||||
* @throws WorkflowException workflow exception
|
||||
*/
|
||||
|
||||
@ -21,6 +21,8 @@ import org.onlab.osgi.ServiceNotFoundException;
|
||||
import org.onosproject.event.Event;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.onosproject.workflow.api.CheckCondition.check;
|
||||
|
||||
@ -60,9 +62,9 @@ public class DefaultWorkflowContext extends WorkflowContext {
|
||||
private transient Class<? extends Event> completionEventType;
|
||||
|
||||
/**
|
||||
* Completion event hint.
|
||||
* Completion event hint Set.
|
||||
*/
|
||||
private transient String completionEventHint;
|
||||
private transient Set<String> completionEventHintSet;
|
||||
|
||||
/**
|
||||
* Completion event generator method reference.
|
||||
@ -170,7 +172,18 @@ public class DefaultWorkflowContext extends WorkflowContext {
|
||||
public void waitCompletion(Class<? extends Event> eventType, String eventHint,
|
||||
WorkExecutor eventGenerator, long timeoutMs) {
|
||||
this.completionEventType = eventType;
|
||||
this.completionEventHint = eventHint;
|
||||
this.completionEventHintSet = new HashSet<>();
|
||||
this.completionEventHintSet.add(eventHint);
|
||||
this.completionEventGenerator = eventGenerator;
|
||||
this.completionEventTimeoutMs = timeoutMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitAnyCompletion(Class<? extends Event> eventType, Set<String> eventHint,
|
||||
WorkExecutor eventGenerator, long timeoutMs) {
|
||||
this.completionEventType = eventType;
|
||||
this.completionEventHintSet = new HashSet<>();
|
||||
this.completionEventHintSet.addAll(eventHint);
|
||||
this.completionEventGenerator = eventGenerator;
|
||||
this.completionEventTimeoutMs = timeoutMs;
|
||||
}
|
||||
@ -186,8 +199,8 @@ public class DefaultWorkflowContext extends WorkflowContext {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String completionEventHint() {
|
||||
return completionEventHint;
|
||||
public Set<String> completionEventHints() {
|
||||
return completionEventHintSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -17,7 +17,9 @@ package org.onosproject.workflow.api;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.onosproject.workflow.api.CheckCondition.check;
|
||||
|
||||
@ -32,9 +34,9 @@ public final class EventTimeoutTask extends HandlerTask {
|
||||
private final String eventType;
|
||||
|
||||
/**
|
||||
* Event hint value for finding target event.
|
||||
* Set of Event hint value for finding target event.
|
||||
*/
|
||||
private final String eventHint;
|
||||
private final Set<String> eventHintSet = new HashSet<>();
|
||||
|
||||
/**
|
||||
* Constructor of EventTimeoutTask.
|
||||
@ -43,7 +45,7 @@ public final class EventTimeoutTask extends HandlerTask {
|
||||
private EventTimeoutTask(Builder builder) {
|
||||
super(builder);
|
||||
this.eventType = builder.eventType;
|
||||
this.eventHint = builder.eventHint;
|
||||
this.eventHintSet.addAll(builder.eventHintSet);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -55,11 +57,11 @@ public final class EventTimeoutTask extends HandlerTask {
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets event hint value for finding target event.
|
||||
* @return event hint string
|
||||
* Gets set of event hint value for finding target event.
|
||||
* @return event hint set
|
||||
*/
|
||||
public String eventHint() {
|
||||
return eventHint;
|
||||
public Set<String> eventHintSet() {
|
||||
return eventHintSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -76,7 +78,7 @@ public final class EventTimeoutTask extends HandlerTask {
|
||||
return false;
|
||||
}
|
||||
return Objects.equals(this.eventType(), ((EventTask) obj).eventType())
|
||||
&& Objects.equals(this.eventHint(), ((EventTask) obj).eventHint());
|
||||
&& Objects.equals(this.eventHintSet(), ((EventTask) obj).eventHint());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -85,7 +87,7 @@ public final class EventTimeoutTask extends HandlerTask {
|
||||
.add("context", context())
|
||||
.add("programCounter", programCounter())
|
||||
.add("eventType", eventType())
|
||||
.add("eventHint", eventHint())
|
||||
.add("eventHint", eventHintSet())
|
||||
.toString();
|
||||
}
|
||||
|
||||
@ -107,9 +109,9 @@ public final class EventTimeoutTask extends HandlerTask {
|
||||
private String eventType;
|
||||
|
||||
/**
|
||||
* Event hint value for finding target event.
|
||||
* Set of Event hint value for finding target event.
|
||||
*/
|
||||
private String eventHint;
|
||||
private Set<String> eventHintSet;
|
||||
|
||||
/**
|
||||
* Sets Event type (Class name of event).
|
||||
@ -123,11 +125,11 @@ public final class EventTimeoutTask extends HandlerTask {
|
||||
|
||||
/**
|
||||
* Sets event hint string for finding target event.
|
||||
* @param eventHint event hint string
|
||||
* @param eventHintSet Set of event hint string
|
||||
* @return builder of EventTimeoutTask
|
||||
*/
|
||||
public Builder eventHint(String eventHint) {
|
||||
this.eventHint = eventHint;
|
||||
public Builder eventHintSet(Set<String> eventHintSet) {
|
||||
this.eventHintSet = eventHintSet;
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -150,7 +152,7 @@ public final class EventTimeoutTask extends HandlerTask {
|
||||
*/
|
||||
public EventTimeoutTask build() throws WorkflowException {
|
||||
check(eventType != null, "eventType is invalid");
|
||||
check(eventHint != null, "eventType is invalid");
|
||||
check(eventHintSet != null, "eventHintSet is invalid");
|
||||
return new EventTimeoutTask(this);
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ package org.onosproject.workflow.api;
|
||||
import org.onosproject.event.Event;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* An abstract class representing WorkflowContext.
|
||||
@ -98,6 +99,20 @@ public abstract class WorkflowContext extends WorkflowData {
|
||||
public abstract void waitCompletion(Class<? extends Event> eventType, String eventHint,
|
||||
WorkExecutor eventGenerator, long timeoutMs);
|
||||
|
||||
|
||||
/**
|
||||
* Waits an event which has any one of eventHint from Set 'eventHintSet' after executing executor.
|
||||
* If the event happens, Worklet.isCompleted will be called.
|
||||
* If the event does not happen for timeoutMs, Worklet.timeout will be called.
|
||||
* @param eventType the class of event to wait
|
||||
* @param eventHintSet the Set of eventHints of the event to wait
|
||||
* @param eventGenerator a method reference to be executed after executing executor
|
||||
* @param timeoutMs timeout millisecond
|
||||
*/
|
||||
public abstract void waitAnyCompletion(Class<? extends Event> eventType, Set<String> eventHintSet,
|
||||
WorkExecutor eventGenerator, long timeoutMs);
|
||||
|
||||
|
||||
/**
|
||||
* Waits timeout milliseconds. After timeoutMs Worklet.timeout will be called.
|
||||
* @param timeoutMs timeout millisecond
|
||||
@ -111,10 +126,10 @@ public abstract class WorkflowContext extends WorkflowData {
|
||||
public abstract Class<? extends Event> completionEventType();
|
||||
|
||||
/**
|
||||
* Returns the event hint string to wait.
|
||||
* @return the event hint string
|
||||
* Returns the set of event hint string to wait.
|
||||
* @return the event hint string set
|
||||
*/
|
||||
public abstract String completionEventHint();
|
||||
public abstract Set<String> completionEventHints();
|
||||
|
||||
/**
|
||||
* Returns method reference for generating completion event.
|
||||
|
||||
@ -18,6 +18,8 @@ package org.onosproject.workflow.api;
|
||||
import org.onosproject.event.Event;
|
||||
import org.onosproject.event.ListenerService;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Interface for workflow execution service.
|
||||
*/
|
||||
@ -45,11 +47,11 @@ public interface WorkflowExecutionService extends ListenerService<WorkflowDataEv
|
||||
/**
|
||||
* Registers workflow event map.
|
||||
* @param eventType event type (class name of event)
|
||||
* @param eventHint event hint value
|
||||
* @param eventHintSet Set of event hint value
|
||||
* @param contextName workflow context name to be called by this event map
|
||||
* @param programCounterString worklet type to be called by this event map
|
||||
* @throws WorkflowException workflow exception
|
||||
*/
|
||||
void registerEventMap(Class<? extends Event> eventType, String eventHint,
|
||||
void registerEventMap(Class<? extends Event> eventType, Set<String> eventHintSet,
|
||||
String contextName, String programCounterString) throws WorkflowException;
|
||||
}
|
||||
|
||||
@ -19,6 +19,8 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.onosproject.store.service.EventuallyConsistentMap;
|
||||
import org.onosproject.store.service.WallClockTimestamp;
|
||||
import org.osgi.service.component.annotations.Activate;
|
||||
import org.osgi.service.component.annotations.Component;
|
||||
import org.osgi.service.component.annotations.Deactivate;
|
||||
@ -43,6 +45,7 @@ import org.slf4j.Logger;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
@ -63,6 +66,9 @@ public class DistributedContextEventMapTreeStore implements ContextEventMapStore
|
||||
|
||||
private AsyncDocumentTree<String> eventMapTree;
|
||||
|
||||
private EventuallyConsistentMap<String, Set<String>> hintSetPerCxtMap;
|
||||
|
||||
|
||||
@Activate
|
||||
public void activate() {
|
||||
|
||||
@ -78,36 +84,66 @@ public class DistributedContextEventMapTreeStore implements ContextEventMapStore
|
||||
.withName("context-event-map-store")
|
||||
.withOrdering(Ordering.INSERTION)
|
||||
.buildDocumentTree();
|
||||
|
||||
hintSetPerCxtMap = storageService.<String, Set<String>>eventuallyConsistentMapBuilder()
|
||||
.withName("workflow-event-hint-per-cxt")
|
||||
.withSerializer(eventMapNamespace)
|
||||
.withTimestampProvider((k, v) -> new WallClockTimestamp())
|
||||
.build();
|
||||
|
||||
log.info("Started");
|
||||
}
|
||||
|
||||
@Deactivate
|
||||
public void deactivate() {
|
||||
eventMapTree.destroy();
|
||||
hintSetPerCxtMap.destroy();
|
||||
log.info("Stopped");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerEventMap(String eventType, String eventHint,
|
||||
public void registerEventMap(String eventType, Set<String> eventHintSet,
|
||||
String contextName, String programCounterString) throws WorkflowException {
|
||||
DocumentPath dpath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName));
|
||||
String currentWorkletType = completeVersioned(eventMapTree.get(dpath));
|
||||
if (currentWorkletType == null) {
|
||||
complete(eventMapTree.createRecursive(dpath, programCounterString));
|
||||
} else {
|
||||
complete(eventMapTree.replace(dpath, programCounterString, currentWorkletType));
|
||||
for (String eventHint : eventHintSet) {
|
||||
//Insert in eventCxtPerHintMap
|
||||
DocumentPath dpathForCxt = DocumentPath.from(Lists.newArrayList(
|
||||
"root", eventType, eventHint, contextName));
|
||||
String currentWorkletType = completeVersioned(eventMapTree.get(dpathForCxt));
|
||||
if (currentWorkletType == null) {
|
||||
complete(eventMapTree.createRecursive(dpathForCxt, programCounterString));
|
||||
} else {
|
||||
complete(eventMapTree.replace(dpathForCxt, programCounterString, currentWorkletType));
|
||||
}
|
||||
log.trace("RegisterEventMap for eventType:{}, eventSet:{}, contextName:{}, pc:{}",
|
||||
eventType, eventHintSet, contextName, programCounterString);
|
||||
|
||||
}
|
||||
hintSetPerCxtMap.put(contextName, eventHintSet);
|
||||
log.trace("RegisterEventMap in hintSetPerCxt for " +
|
||||
"eventType:{}, eventSet:{}, contextName:{}, pc:{}",
|
||||
eventType, eventHintSet, contextName, programCounterString);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterEventMap(String eventType, String eventHint, String contextName) throws WorkflowException {
|
||||
DocumentPath contextPath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName));
|
||||
complete(eventMapTree.removeNode(contextPath));
|
||||
public void unregisterEventMap(String eventType, String contextName)
|
||||
throws WorkflowException {
|
||||
|
||||
Set<String> hints = hintSetPerCxtMap.get(contextName);
|
||||
for (String eventHint : hints) {
|
||||
//Remove from eventCxtPerHintMap
|
||||
complete(eventMapTree.removeNode(DocumentPath.from(Lists.newArrayList(
|
||||
"root", eventType, eventHint, contextName))));
|
||||
log.trace("UnregisterEventMap from eventCxtPerHintMap for eventType:{}, eventSet:{}, contextName:{}",
|
||||
eventType, eventHint, contextName);
|
||||
}
|
||||
hintSetPerCxtMap.remove(contextName);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, String> getEventMap(String eventType, String eventHint) throws WorkflowException {
|
||||
DocumentPath path = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint));
|
||||
public Map<String, String> getEventMapByHint(String eventType, String eventHint) throws WorkflowException {
|
||||
DocumentPath path = DocumentPath.from(
|
||||
Lists.newArrayList("root", eventType, eventHint));
|
||||
Map<String, Versioned<String>> contexts = complete(eventMapTree.getChildren(path));
|
||||
Map<String, String> eventMap = Maps.newHashMap();
|
||||
if (Objects.isNull(contexts)) {
|
||||
@ -117,9 +153,24 @@ public class DistributedContextEventMapTreeStore implements ContextEventMapStore
|
||||
for (Map.Entry<String, Versioned<String>> entry : contexts.entrySet()) {
|
||||
eventMap.put(entry.getKey(), entry.getValue().value());
|
||||
}
|
||||
log.trace("getEventMapByHint returns eventMap {} ", eventMap);
|
||||
return eventMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEventMapPresent(String contextName) {
|
||||
Map<String, String> eventMap = Maps.newHashMap();
|
||||
Set<String> eventHintSet = hintSetPerCxtMap.get(contextName);
|
||||
if (Objects.nonNull(eventHintSet)) {
|
||||
log.trace("EventMap present for Context:{}", contextName);
|
||||
return true;
|
||||
} else {
|
||||
log.trace("EventMap Doesnt exist for Context:{}", contextName);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, Versioned<String>> getChildren(String path) throws WorkflowException {
|
||||
DocumentPath dpath = DocumentPath.from(path);
|
||||
|
||||
@ -16,7 +16,6 @@
|
||||
package org.onosproject.workflow.impl;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.onosproject.cluster.ClusterService;
|
||||
import org.onosproject.cluster.LeadershipService;
|
||||
import org.onosproject.cluster.NodeId;
|
||||
@ -64,6 +63,7 @@ import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@ -242,7 +242,7 @@ public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, W
|
||||
}
|
||||
|
||||
try {
|
||||
eventMap = eventMapStore.getEventMap(event.getClass().getName(), eventHint);
|
||||
eventMap = eventMapStore.getEventMapByHint(event.getClass().getName(), eventHint);
|
||||
} catch (WorkflowException e) {
|
||||
log.error("Exception: ", e);
|
||||
return;
|
||||
@ -290,22 +290,25 @@ public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, W
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerEventMap(Class<? extends Event> eventType, String eventHint,
|
||||
public void registerEventMap(Class<? extends Event> eventType, Set<String> eventHintSet,
|
||||
String contextName, String programCounterString) throws WorkflowException {
|
||||
eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, programCounterString);
|
||||
for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
|
||||
Map<String, String> eventMap = eventMapStore.getEventMap(eventType.getName(), eventHint);
|
||||
if (eventMap != null && eventMap.containsKey(contextName)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
log.info("sleep {}", i);
|
||||
Thread.sleep(10L * (i + 1));
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Exception: ", e);
|
||||
Thread.currentThread().interrupt();
|
||||
eventMapStore.registerEventMap(eventType.getName(), eventHintSet, contextName, programCounterString);
|
||||
for (String eventHint : eventHintSet) {
|
||||
for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
|
||||
Map<String, String> eventMap = eventMapStore.getEventMapByHint(eventType.getName(), eventHint);
|
||||
if (eventMap != null && eventMap.containsKey(contextName)) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
log.info("sleep {}", i);
|
||||
Thread.sleep(10L * (i + 1));
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Exception: ", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -442,19 +445,8 @@ public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, W
|
||||
*/
|
||||
private EventTask execEventTask(EventTask task) {
|
||||
|
||||
Map<String, String> eventMap = null;
|
||||
try {
|
||||
eventMap = eventMapStore.getEventMap(task.event().getClass().getName(), task.eventHint());
|
||||
} catch (WorkflowException e) {
|
||||
log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
|
||||
return task;
|
||||
}
|
||||
|
||||
if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
|
||||
return task;
|
||||
}
|
||||
|
||||
if (Objects.isNull(eventMap.get(task.context().name()))) {
|
||||
if (!eventMapStore.isEventMapPresent(task.context().name())) {
|
||||
log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
|
||||
return task;
|
||||
}
|
||||
|
||||
@ -501,10 +493,10 @@ public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, W
|
||||
log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
|
||||
|
||||
eventMapStore.unregisterEventMap(
|
||||
task.eventType(), task.eventHint(), latestContext.name());
|
||||
task.eventType(), latestContext.name());
|
||||
|
||||
//completed case
|
||||
// increase program counter
|
||||
//increase program counter
|
||||
ProgramCounter pc = latestContext.current();
|
||||
latestContext.setCurrent(workflow.increased(pc));
|
||||
|
||||
@ -543,20 +535,8 @@ public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, W
|
||||
*/
|
||||
private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
|
||||
|
||||
Map<String, String> eventMap = null;
|
||||
try {
|
||||
eventMap = eventMapStore.getEventMap(task.eventType(), task.eventHint());
|
||||
} catch (WorkflowException e) {
|
||||
log.error("execEventTimeoutTask: Exception: {}, trace: {}",
|
||||
e, Lists.newArrayList(e.getStackTrace()));
|
||||
return task;
|
||||
}
|
||||
|
||||
if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
|
||||
return task;
|
||||
}
|
||||
|
||||
if (Objects.isNull(eventMap.get(task.context().name()))) {
|
||||
if (!eventMapStore.isEventMapPresent(task.context().name())) {
|
||||
log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
|
||||
return task;
|
||||
}
|
||||
|
||||
@ -590,8 +570,7 @@ public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, W
|
||||
|
||||
initWorkletExecution(latestContext);
|
||||
|
||||
eventMapStore.unregisterEventMap(
|
||||
task.eventType(), task.eventHint(), latestContext.name());
|
||||
eventMapStore.unregisterEventMap(task.eventType(), latestContext.name());
|
||||
|
||||
log.info("{} worklet.timeout(for event):{}", latestContext.name(), worklet.tag());
|
||||
log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
|
||||
@ -796,7 +775,7 @@ public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, W
|
||||
throw new WorkflowException(msg);
|
||||
}
|
||||
|
||||
registerEventMap(latestContext.completionEventType(), latestContext.completionEventHint(),
|
||||
registerEventMap(latestContext.completionEventType(), latestContext.completionEventHints(),
|
||||
latestContext.name(), pc.toString());
|
||||
|
||||
latestContext.completionEventGenerator().apply();
|
||||
@ -806,7 +785,7 @@ public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, W
|
||||
.context(latestContext)
|
||||
.programCounter(pc)
|
||||
.eventType(latestContext.completionEventType().getName())
|
||||
.eventHint(latestContext.completionEventHint())
|
||||
.eventHintSet(latestContext.completionEventHints())
|
||||
.build();
|
||||
timerChain.schedule(latestContext.completionEventTimeout(),
|
||||
() -> {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user