diff --git a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/ContextEventMapStore.java b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/ContextEventMapStore.java index 6d0f0fbd59..4b54cf3c64 100644 --- a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/ContextEventMapStore.java +++ b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/ContextEventMapStore.java @@ -21,6 +21,7 @@ import org.onosproject.store.service.DocumentPath; import org.onosproject.store.service.Versioned; import java.util.Map; +import java.util.Set; /** * WorkflowContext Event Map Store. @@ -30,36 +31,43 @@ public interface ContextEventMapStore { /** * Registers workflow context event mapping. * @param eventType the class name of event - * @param eventHint event hint string value of the event + * @param eventHintSet Set of event hint string value of the event * @param contextName workflow context name * @param programCounterString the program counter of workflow * @throws WorkflowException workflow exception */ - void registerEventMap(String eventType, String eventHint, + void registerEventMap(String eventType, Set eventHintSet, String contextName, String programCounterString) throws WorkflowException; /** * Unregisters workflow context event mapping. * @param eventType the class name of event - * @param eventHint event hint string value of the event * @param contextName workflow context name * @throws WorkflowException workflow exception */ - void unregisterEventMap(String eventType, String eventHint, + void unregisterEventMap(String eventType, String contextName) throws WorkflowException; /** * Returns workflow context event mapping. * @param eventType the class name of event - * @param eventHint event hint string value of the event - * @return workflow context event mapping + * @param eventHint vent hint string value of the event + * @return Map of workflow context and value (program counter) * @throws WorkflowException workflow exception */ - Map getEventMap(String eventType, String eventHint) throws WorkflowException; + Map getEventMapByHint(String eventType, + String eventHint) throws WorkflowException; + + /** + * Returns true or false depending on existence of eventMap of given context. + * @param contextName name of workflow context + * @return Boolean true or false depending on existence of eventMap of given context + */ + boolean isEventMapPresent(String contextName); /** * Returns child nodes on document tree path. - * @param path document tree path + * @param path document tree path including eventType and Hint * @return children under document tree path * @throws WorkflowException workflow exception */ diff --git a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/DefaultWorkflowContext.java b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/DefaultWorkflowContext.java index 05c564a71d..8f5d3c75a6 100644 --- a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/DefaultWorkflowContext.java +++ b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/DefaultWorkflowContext.java @@ -21,6 +21,8 @@ import org.onlab.osgi.ServiceNotFoundException; import org.onosproject.event.Event; import java.net.URI; +import java.util.HashSet; +import java.util.Set; import static org.onosproject.workflow.api.CheckCondition.check; @@ -60,9 +62,9 @@ public class DefaultWorkflowContext extends WorkflowContext { private transient Class completionEventType; /** - * Completion event hint. + * Completion event hint Set. */ - private transient String completionEventHint; + private transient Set completionEventHintSet; /** * Completion event generator method reference. @@ -170,7 +172,18 @@ public class DefaultWorkflowContext extends WorkflowContext { public void waitCompletion(Class eventType, String eventHint, WorkExecutor eventGenerator, long timeoutMs) { this.completionEventType = eventType; - this.completionEventHint = eventHint; + this.completionEventHintSet = new HashSet<>(); + this.completionEventHintSet.add(eventHint); + this.completionEventGenerator = eventGenerator; + this.completionEventTimeoutMs = timeoutMs; + } + + @Override + public void waitAnyCompletion(Class eventType, Set eventHint, + WorkExecutor eventGenerator, long timeoutMs) { + this.completionEventType = eventType; + this.completionEventHintSet = new HashSet<>(); + this.completionEventHintSet.addAll(eventHint); this.completionEventGenerator = eventGenerator; this.completionEventTimeoutMs = timeoutMs; } @@ -186,8 +199,8 @@ public class DefaultWorkflowContext extends WorkflowContext { } @Override - public String completionEventHint() { - return completionEventHint; + public Set completionEventHints() { + return completionEventHintSet; } @Override diff --git a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/EventTimeoutTask.java b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/EventTimeoutTask.java index 9aa870d2f4..2ee2672a01 100644 --- a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/EventTimeoutTask.java +++ b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/EventTimeoutTask.java @@ -17,7 +17,9 @@ package org.onosproject.workflow.api; import com.google.common.base.MoreObjects; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import static org.onosproject.workflow.api.CheckCondition.check; @@ -32,9 +34,9 @@ public final class EventTimeoutTask extends HandlerTask { private final String eventType; /** - * Event hint value for finding target event. + * Set of Event hint value for finding target event. */ - private final String eventHint; + private final Set eventHintSet = new HashSet<>(); /** * Constructor of EventTimeoutTask. @@ -43,7 +45,7 @@ public final class EventTimeoutTask extends HandlerTask { private EventTimeoutTask(Builder builder) { super(builder); this.eventType = builder.eventType; - this.eventHint = builder.eventHint; + this.eventHintSet.addAll(builder.eventHintSet); } /** @@ -55,11 +57,11 @@ public final class EventTimeoutTask extends HandlerTask { } /** - * Gets event hint value for finding target event. - * @return event hint string + * Gets set of event hint value for finding target event. + * @return event hint set */ - public String eventHint() { - return eventHint; + public Set eventHintSet() { + return eventHintSet; } @Override @@ -76,7 +78,7 @@ public final class EventTimeoutTask extends HandlerTask { return false; } return Objects.equals(this.eventType(), ((EventTask) obj).eventType()) - && Objects.equals(this.eventHint(), ((EventTask) obj).eventHint()); + && Objects.equals(this.eventHintSet(), ((EventTask) obj).eventHint()); } @Override @@ -85,7 +87,7 @@ public final class EventTimeoutTask extends HandlerTask { .add("context", context()) .add("programCounter", programCounter()) .add("eventType", eventType()) - .add("eventHint", eventHint()) + .add("eventHint", eventHintSet()) .toString(); } @@ -107,9 +109,9 @@ public final class EventTimeoutTask extends HandlerTask { private String eventType; /** - * Event hint value for finding target event. + * Set of Event hint value for finding target event. */ - private String eventHint; + private Set eventHintSet; /** * Sets Event type (Class name of event). @@ -123,11 +125,11 @@ public final class EventTimeoutTask extends HandlerTask { /** * Sets event hint string for finding target event. - * @param eventHint event hint string + * @param eventHintSet Set of event hint string * @return builder of EventTimeoutTask */ - public Builder eventHint(String eventHint) { - this.eventHint = eventHint; + public Builder eventHintSet(Set eventHintSet) { + this.eventHintSet = eventHintSet; return this; } @@ -150,7 +152,7 @@ public final class EventTimeoutTask extends HandlerTask { */ public EventTimeoutTask build() throws WorkflowException { check(eventType != null, "eventType is invalid"); - check(eventHint != null, "eventType is invalid"); + check(eventHintSet != null, "eventHintSet is invalid"); return new EventTimeoutTask(this); } } diff --git a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowContext.java b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowContext.java index 9f0b4fe0ea..836524dada 100644 --- a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowContext.java +++ b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowContext.java @@ -18,6 +18,7 @@ package org.onosproject.workflow.api; import org.onosproject.event.Event; import java.net.URI; +import java.util.Set; /** * An abstract class representing WorkflowContext. @@ -98,6 +99,20 @@ public abstract class WorkflowContext extends WorkflowData { public abstract void waitCompletion(Class eventType, String eventHint, WorkExecutor eventGenerator, long timeoutMs); + + /** + * Waits an event which has any one of eventHint from Set 'eventHintSet' after executing executor. + * If the event happens, Worklet.isCompleted will be called. + * If the event does not happen for timeoutMs, Worklet.timeout will be called. + * @param eventType the class of event to wait + * @param eventHintSet the Set of eventHints of the event to wait + * @param eventGenerator a method reference to be executed after executing executor + * @param timeoutMs timeout millisecond + */ + public abstract void waitAnyCompletion(Class eventType, Set eventHintSet, + WorkExecutor eventGenerator, long timeoutMs); + + /** * Waits timeout milliseconds. After timeoutMs Worklet.timeout will be called. * @param timeoutMs timeout millisecond @@ -111,10 +126,10 @@ public abstract class WorkflowContext extends WorkflowData { public abstract Class completionEventType(); /** - * Returns the event hint string to wait. - * @return the event hint string + * Returns the set of event hint string to wait. + * @return the event hint string set */ - public abstract String completionEventHint(); + public abstract Set completionEventHints(); /** * Returns method reference for generating completion event. diff --git a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowExecutionService.java b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowExecutionService.java index 9f609b3200..c5e00a7e15 100644 --- a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowExecutionService.java +++ b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowExecutionService.java @@ -18,6 +18,8 @@ package org.onosproject.workflow.api; import org.onosproject.event.Event; import org.onosproject.event.ListenerService; +import java.util.Set; + /** * Interface for workflow execution service. */ @@ -45,11 +47,11 @@ public interface WorkflowExecutionService extends ListenerService eventType, String eventHint, + void registerEventMap(Class eventType, Set eventHintSet, String contextName, String programCounterString) throws WorkflowException; } diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java index 1b725de9db..8cd352e1c3 100644 --- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java +++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java @@ -19,6 +19,8 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.WallClockTimestamp; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; @@ -43,6 +45,7 @@ import org.slf4j.Logger; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -63,6 +66,9 @@ public class DistributedContextEventMapTreeStore implements ContextEventMapStore private AsyncDocumentTree eventMapTree; + private EventuallyConsistentMap> hintSetPerCxtMap; + + @Activate public void activate() { @@ -78,36 +84,66 @@ public class DistributedContextEventMapTreeStore implements ContextEventMapStore .withName("context-event-map-store") .withOrdering(Ordering.INSERTION) .buildDocumentTree(); + + hintSetPerCxtMap = storageService.>eventuallyConsistentMapBuilder() + .withName("workflow-event-hint-per-cxt") + .withSerializer(eventMapNamespace) + .withTimestampProvider((k, v) -> new WallClockTimestamp()) + .build(); + log.info("Started"); } @Deactivate public void deactivate() { eventMapTree.destroy(); + hintSetPerCxtMap.destroy(); log.info("Stopped"); } @Override - public void registerEventMap(String eventType, String eventHint, + public void registerEventMap(String eventType, Set eventHintSet, String contextName, String programCounterString) throws WorkflowException { - DocumentPath dpath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName)); - String currentWorkletType = completeVersioned(eventMapTree.get(dpath)); - if (currentWorkletType == null) { - complete(eventMapTree.createRecursive(dpath, programCounterString)); - } else { - complete(eventMapTree.replace(dpath, programCounterString, currentWorkletType)); + for (String eventHint : eventHintSet) { + //Insert in eventCxtPerHintMap + DocumentPath dpathForCxt = DocumentPath.from(Lists.newArrayList( + "root", eventType, eventHint, contextName)); + String currentWorkletType = completeVersioned(eventMapTree.get(dpathForCxt)); + if (currentWorkletType == null) { + complete(eventMapTree.createRecursive(dpathForCxt, programCounterString)); + } else { + complete(eventMapTree.replace(dpathForCxt, programCounterString, currentWorkletType)); + } + log.trace("RegisterEventMap for eventType:{}, eventSet:{}, contextName:{}, pc:{}", + eventType, eventHintSet, contextName, programCounterString); + } + hintSetPerCxtMap.put(contextName, eventHintSet); + log.trace("RegisterEventMap in hintSetPerCxt for " + + "eventType:{}, eventSet:{}, contextName:{}, pc:{}", + eventType, eventHintSet, contextName, programCounterString); } @Override - public void unregisterEventMap(String eventType, String eventHint, String contextName) throws WorkflowException { - DocumentPath contextPath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName)); - complete(eventMapTree.removeNode(contextPath)); + public void unregisterEventMap(String eventType, String contextName) + throws WorkflowException { + + Set hints = hintSetPerCxtMap.get(contextName); + for (String eventHint : hints) { + //Remove from eventCxtPerHintMap + complete(eventMapTree.removeNode(DocumentPath.from(Lists.newArrayList( + "root", eventType, eventHint, contextName)))); + log.trace("UnregisterEventMap from eventCxtPerHintMap for eventType:{}, eventSet:{}, contextName:{}", + eventType, eventHint, contextName); + } + hintSetPerCxtMap.remove(contextName); } + @Override - public Map getEventMap(String eventType, String eventHint) throws WorkflowException { - DocumentPath path = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint)); + public Map getEventMapByHint(String eventType, String eventHint) throws WorkflowException { + DocumentPath path = DocumentPath.from( + Lists.newArrayList("root", eventType, eventHint)); Map> contexts = complete(eventMapTree.getChildren(path)); Map eventMap = Maps.newHashMap(); if (Objects.isNull(contexts)) { @@ -117,9 +153,24 @@ public class DistributedContextEventMapTreeStore implements ContextEventMapStore for (Map.Entry> entry : contexts.entrySet()) { eventMap.put(entry.getKey(), entry.getValue().value()); } + log.trace("getEventMapByHint returns eventMap {} ", eventMap); return eventMap; } + @Override + public boolean isEventMapPresent(String contextName) { + Map eventMap = Maps.newHashMap(); + Set eventHintSet = hintSetPerCxtMap.get(contextName); + if (Objects.nonNull(eventHintSet)) { + log.trace("EventMap present for Context:{}", contextName); + return true; + } else { + log.trace("EventMap Doesnt exist for Context:{}", contextName); + return false; + } + } + + @Override public Map> getChildren(String path) throws WorkflowException { DocumentPath dpath = DocumentPath.from(path); diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java index 237f70edb6..b472c5f74b 100644 --- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java +++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java @@ -16,7 +16,6 @@ package org.onosproject.workflow.impl; import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.google.common.collect.Lists; import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.LeadershipService; import org.onosproject.cluster.NodeId; @@ -64,6 +63,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -242,7 +242,7 @@ public class WorkFlowEngine extends AbstractListenerManager eventType, String eventHint, + public void registerEventMap(Class eventType, Set eventHintSet, String contextName, String programCounterString) throws WorkflowException { - eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, programCounterString); - for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) { - Map eventMap = eventMapStore.getEventMap(eventType.getName(), eventHint); - if (eventMap != null && eventMap.containsKey(contextName)) { - return; - } - try { - log.info("sleep {}", i); - Thread.sleep(10L * (i + 1)); - } catch (InterruptedException e) { - log.error("Exception: ", e); - Thread.currentThread().interrupt(); + eventMapStore.registerEventMap(eventType.getName(), eventHintSet, contextName, programCounterString); + for (String eventHint : eventHintSet) { + for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) { + Map eventMap = eventMapStore.getEventMapByHint(eventType.getName(), eventHint); + if (eventMap != null && eventMap.containsKey(contextName)) { + break; + } + try { + log.info("sleep {}", i); + Thread.sleep(10L * (i + 1)); + } catch (InterruptedException e) { + log.error("Exception: ", e); + Thread.currentThread().interrupt(); + } } } + } @Override @@ -442,19 +445,8 @@ public class WorkFlowEngine extends AbstractListenerManager eventMap = null; - try { - eventMap = eventMapStore.getEventMap(task.event().getClass().getName(), task.eventHint()); - } catch (WorkflowException e) { - log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace())); - return task; - } - - if (Objects.isNull(eventMap) || eventMap.isEmpty()) { - return task; - } - - if (Objects.isNull(eventMap.get(task.context().name()))) { + if (!eventMapStore.isEventMapPresent(task.context().name())) { + log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name()); return task; } @@ -501,10 +493,10 @@ public class WorkFlowEngine extends AbstractListenerManager eventMap = null; - try { - eventMap = eventMapStore.getEventMap(task.eventType(), task.eventHint()); - } catch (WorkflowException e) { - log.error("execEventTimeoutTask: Exception: {}, trace: {}", - e, Lists.newArrayList(e.getStackTrace())); - return task; - } - - if (Objects.isNull(eventMap) || eventMap.isEmpty()) { - return task; - } - - if (Objects.isNull(eventMap.get(task.context().name()))) { + if (!eventMapStore.isEventMapPresent(task.context().name())) { + log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name()); return task; } @@ -590,8 +570,7 @@ public class WorkFlowEngine extends AbstractListenerManager {