From e8f48d8d080b8fa57fe81828178bbc550e5cc6af Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 24 Aug 2016 20:27:48 -0700 Subject: [PATCH] ExecutorService with somewhat predictable thread assignment. - ExecutorService which allows the caller or the Task to express hint about which Thread it needs to be executed. Change-Id: If1cc58f6b2369bb5afce4f402c195eacedf67f05 --- .../org/onlab/util/PredictableExecutor.java | 333 ++++++++++++++++++ .../onlab/util/PredictableExecutorTest.java | 124 +++++++ 2 files changed, 457 insertions(+) create mode 100644 utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java create mode 100644 utils/misc/src/test/java/org/onlab/util/PredictableExecutorTest.java diff --git a/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java b/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java new file mode 100644 index 0000000000..836d3ae478 --- /dev/null +++ b/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java @@ -0,0 +1,333 @@ +/* + * Copyright 2016-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * (Somewhat) predictable ExecutorService. + *

+ * ExecutorService which behaves similar to the one created by + * {@link Executors#newFixedThreadPool(int, ThreadFactory)}, + * but assigns command to specific thread based on + * it's {@link PickyTask#hint()}, {@link Object#hashCode()}, or hint value explicitly + * specified when the command was passed to this {@link PredictableExecutor}. + */ +public class PredictableExecutor + extends AbstractExecutorService + implements ExecutorService { + + private final List backends; + + /** + * Creates {@link PredictableExecutor} instance. + * + * @param buckets number of buckets or 0 to match available processors + * @param threadFactory {@link ThreadFactory} to use to create threads + * @return {@link PredictableExecutor} + */ + public static PredictableExecutor newPredictableExecutor(int buckets, ThreadFactory threadFactory) { + return new PredictableExecutor(buckets, threadFactory); + } + + /** + * Creates {@link PredictableExecutor} instance. + * + * @param buckets number of buckets or 0 to match available processors + * @param threadFactory {@link ThreadFactory} to use to create threads + */ + public PredictableExecutor(int buckets, ThreadFactory threadFactory) { + checkArgument(buckets >= 0, "number of buckets must be non zero"); + checkNotNull(threadFactory); + if (buckets == 0) { + buckets = Runtime.getRuntime().availableProcessors(); + } + this.backends = new ArrayList<>(buckets); + + for (int i = 0; i < buckets; ++i) { + this.backends.add(backendExecutorService(threadFactory)); + } + } + + /** + * Creates {@link PredictableExecutor} instance with + * bucket size set to number of available processors. + * + * @param threadFactory {@link ThreadFactory} to use to create threads + */ + public PredictableExecutor(ThreadFactory threadFactory) { + this(0, threadFactory); + } + + /** + * Creates a single thread {@link ExecutorService} to use in the backend. + * + * @param threadFactory {@link ThreadFactory} to use to create threads + * @return single thread {@link ExecutorService} + */ + protected ExecutorService backendExecutorService(ThreadFactory threadFactory) { + return Executors.newSingleThreadExecutor(threadFactory); + } + + + /** + * Executes given command at some time in the future. + * + * @param command the {@link Runnable} task + * @param hint value to pick thread to run on. + */ + public void execute(Runnable command, int hint) { + int index = Math.abs(hint) % backends.size(); + backends.get(index).execute(command); + } + + /** + * Executes given command at some time in the future. + * + * @param command the {@link Runnable} task + * @param hintFunction Function to compute hint value + */ + public void execute(Runnable command, Function hintFunction) { + execute(command, hintFunction.apply(command)); + } + + + private static int hint(Runnable command) { + if (command instanceof PickyTask) { + return ((PickyTask) command).hint(); + } else { + return Objects.hashCode(command); + } + } + + @Override + public void execute(Runnable command) { + execute(command, PredictableExecutor::hint); + } + + @Override + public void shutdown() { + backends.stream().forEach(ExecutorService::shutdown); + } + + @Override + public List shutdownNow() { + return backends.stream() + .map(ExecutorService::shutdownNow) + .flatMap(List::stream) + .collect(Collectors.toList()); + } + + @Override + public boolean isShutdown() { + return backends.stream().allMatch(ExecutorService::isShutdown); + } + + @Override + public boolean isTerminated() { + return backends.stream().allMatch(ExecutorService::isTerminated); + } + + /** + * {@inheritDoc} + *

+ * Note: It'll try, but is not assured that the method will return by specified timeout. + */ + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + + final Duration timeoutD = Duration.of(unit.toMillis(timeout), ChronoUnit.MILLIS); + final Instant start = Instant.now(); + + return backends.parallelStream() + .filter(es -> !es.isTerminated()) + .map(es -> { + long timeoutMs = timeoutD.minus(Duration.between(Instant.now(), start)).toMillis(); + try { + return es.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + }) + .allMatch(result -> result); + } + + @Override + protected PickyFutureTask newTaskFor(Callable callable) { + return new PickyFutureTask<>(callable); + } + + @Override + protected PickyFutureTask newTaskFor(Runnable runnable, T value) { + return new PickyFutureTask<>(runnable, value); + } + + /** + * {@link Runnable} also implementing {@link PickyTask}. + */ + public static interface PickyRunnable extends PickyTask, Runnable { } + + /** + * {@link Callable} also implementing {@link PickyTask}. + * + * @param result type + */ + public static interface PickyCallable extends PickyTask, Callable { } + + /** + * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint. + * + * @param runnable {@link Runnable} + * @param hint hint value + * @return {@link PickyRunnable} + */ + public static PickyRunnable picky(Runnable runnable, int hint) { + return picky(runnable, (r) -> hint); + } + + /** + * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint. + * + * @param runnable {@link Runnable} + * @param hint hint function + * @return {@link PickyRunnable} + */ + public static PickyRunnable picky(Runnable runnable, Function hint) { + checkNotNull(runnable); + checkNotNull(hint); + return new PickyRunnable() { + + @Override + public void run() { + runnable.run(); + } + + @Override + public int hint() { + return hint.apply(runnable); + } + }; + } + + /** + * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint. + * + * @param callable {@link Callable} + * @param hint hint value + * @return {@link PickyCallable} + */ + public static PickyCallable picky(Callable callable, int hint) { + return picky(callable, (c) -> hint); + } + + /** + * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint. + * + * @param callable {@link Callable} + * @param hint hint function + * @return {@link PickyCallable} + */ + public static PickyCallable picky(Callable callable, Function, Integer> hint) { + checkNotNull(callable); + checkNotNull(hint); + return new PickyCallable() { + + @Override + public T call() throws Exception { + return callable.call(); + } + + @Override + public int hint() { + return hint.apply(callable); + } + + }; + } + + /** + * Abstraction to give a task a way to express it's preference to run on + * certain thread. + */ + public static interface PickyTask { + + /** + * Returns hint for choosing which Thread to run this task on. + * + * @return hint value + */ + int hint(); + } + + /** + * A {@link FutureTask} implementing {@link PickyTask}. + *

+ * Note: if the wrapped {@link Callable} or {@link Runnable} was an instance of + * {@link PickyTask}, it will use {@link PickyTask#hint()} value, if not use {@link Object#hashCode()}. + * + * @param result type. + */ + public static class PickyFutureTask + extends FutureTask + implements PickyTask { + + private final Object runnableOrCallable; + + /** + * Same as {@link FutureTask#FutureTask(Runnable, Object)}. + */ + public PickyFutureTask(Runnable runnable, T value) { + super(runnable, value); + runnableOrCallable = checkNotNull(runnable); + } + + /** + * Same as {@link FutureTask#FutureTask(Callable)}. + */ + public PickyFutureTask(Callable callable) { + super(callable); + runnableOrCallable = checkNotNull(callable); + } + + @Override + public int hint() { + if (runnableOrCallable instanceof PickyTask) { + return ((PickyTask) runnableOrCallable).hint(); + } else { + return runnableOrCallable.hashCode(); + } + } + } +} diff --git a/utils/misc/src/test/java/org/onlab/util/PredictableExecutorTest.java b/utils/misc/src/test/java/org/onlab/util/PredictableExecutorTest.java new file mode 100644 index 0000000000..5a2d6e5b5a --- /dev/null +++ b/utils/misc/src/test/java/org/onlab/util/PredictableExecutorTest.java @@ -0,0 +1,124 @@ +package org.onlab.util; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onlab.util.PredictableExecutor.PickyRunnable; +import com.google.common.testing.EqualsTester; + +public class PredictableExecutorTest { + + private PredictableExecutor pexecutor; + private ExecutorService executor; + + @Before + public void setUp() { + pexecutor = new PredictableExecutor(3, Tools.namedThreads("Thread-%d")); + executor = pexecutor; + } + + @After + public void tearDown() { + pexecutor.shutdownNow(); + } + + @Test + public void test() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(7); + AtomicReference hintValue0 = new AtomicReference<>(""); + AtomicReference hintValue1 = new AtomicReference<>(""); + AtomicReference hintFunction0 = new AtomicReference<>(""); + AtomicReference pickyRunnable0 = new AtomicReference<>(""); + AtomicReference pickyRunnable1 = new AtomicReference<>(""); + AtomicReference pickyCallable0 = new AtomicReference<>(""); + AtomicReference hashCode0 = new AtomicReference<>(""); + + pexecutor.execute(() -> { + hintValue0.set(Thread.currentThread().getName()); + latch.countDown(); + }, 0); + + pexecutor.execute(() -> { + hintValue1.set(Thread.currentThread().getName()); + latch.countDown(); + }, 1); + + pexecutor.execute(() -> { + hintFunction0.set(Thread.currentThread().getName()); + latch.countDown(); + }, (runnable) -> 0); + + pexecutor.execute(new PickyRunnable() { + + @Override + public void run() { + pickyRunnable0.set(Thread.currentThread().getName()); + latch.countDown(); + } + + @Override + public int hint() { + return 0; + } + }); + + executor.execute(new PickyRunnable() { + + @Override + public void run() { + pickyRunnable1.set(Thread.currentThread().getName()); + latch.countDown(); + } + + @Override + public int hint() { + return 1; + } + }); + + Callable callable = new Callable() { + @Override + public Void call() { + pickyCallable0.set(Thread.currentThread().getName()); + latch.countDown(); + return null; + } + }; + + executor.submit(PredictableExecutor.picky(callable, 0)); + + + executor.execute(new Runnable() { + + @Override + public void run() { + hashCode0.set(Thread.currentThread().getName()); + latch.countDown(); + + } + + @Override + public int hashCode() { + return 0; + } + }); + + latch.await(1, TimeUnit.SECONDS); + + new EqualsTester() + .addEqualityGroup(hintValue0.get(), + hintFunction0.get(), + pickyRunnable0.get(), + pickyCallable0.get(), + hashCode0.get()) + .addEqualityGroup(hintValue1.get(), + pickyRunnable1.get()) + .testEquals(); + } +}