diff --git a/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java b/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java index b142282dba..48a51e500a 100644 --- a/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java +++ b/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java @@ -20,12 +20,17 @@ import static org.onlab.util.Tools.get; import static org.slf4j.LoggerFactory.getLogger; import java.util.Dictionary; -import java.util.concurrent.ExecutorService; +import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.commons.lang.math.RandomUtils; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -34,6 +39,7 @@ import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; +import org.onlab.util.Tools; import org.onosproject.cfg.ComponentConfigService; import org.onosproject.core.ApplicationId; import org.onosproject.core.CoreService; @@ -42,6 +48,7 @@ import org.onosproject.store.service.StorageService; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; /** @@ -69,24 +76,46 @@ public class DistributedConsensusLoadTest { protected CoreService coreService; private static final int DEFAULT_RATE = 100; + private static final int TOTAL_COUNTERS = 50; @Property(name = "rate", intValue = DEFAULT_RATE, label = "Total number of increments per second to the atomic counter") protected int rate = 0; - private AtomicLong lastValue = new AtomicLong(0); - private AtomicLong lastLoggedTime = new AtomicLong(0); - private AsyncAtomicCounter counter; - private ExecutorService testExecutor = Executors.newSingleThreadExecutor(); + private final AtomicLong previousReportTime = new AtomicLong(0); + private final AtomicLong previousCount = new AtomicLong(0); + private final AtomicInteger increments = new AtomicInteger(0); + private final List counters = Lists.newArrayList(); + private final ScheduledExecutorService runner = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService reporter = Executors.newSingleThreadScheduledExecutor(); @Activate public void activate(ComponentContext context) { configService.registerProperties(getClass()); appId = coreService.registerApplication("org.onosproject.loadtest"); log.info("Started with {}", appId); - counter = storageService.atomicCounterBuilder() - .withName("onos-app-loadtest-counter") + for (int i = 0; i < TOTAL_COUNTERS; ++i) { + AsyncAtomicCounter counter = storageService.atomicCounterBuilder() + .withName(String.format("onos-app-loadtest-counter-%d", i)) .build(); + counters.add(counter); + } + reporter.scheduleWithFixedDelay(() -> { + Tools.allOf(counters.stream() + .map(AsyncAtomicCounter::get) + .collect(Collectors.toList())) + .whenComplete((r, e) -> { + if (e == null) { + long newCount = r.stream().reduce(Long::sum).get(); + long currentTime = System.currentTimeMillis(); + long delta = currentTime - previousReportTime.getAndSet(currentTime); + long rate = (newCount - previousCount.getAndSet(newCount)) * 1000 / delta; + log.info("{} updates per second", rate); + } else { + log.warn(e.getMessage()); + } + }); + }, 5, 5, TimeUnit.SECONDS); modified(null); } @@ -97,16 +126,10 @@ public class DistributedConsensusLoadTest { while (!stopped.get()) { limiter.acquire(); s.acquireUninterruptibly(); - counter.incrementAndGet().whenComplete((r, e) -> { + counters.get(RandomUtils.nextInt(TOTAL_COUNTERS)).incrementAndGet().whenComplete((r, e) -> { s.release(); - long delta = System.currentTimeMillis() - lastLoggedTime.get(); if (e == null) { - if (delta > 1000) { - long tps = (long) ((r - lastValue.get()) * 1000.0) / delta; - lastValue.set(r); - lastLoggedTime.set(System.currentTimeMillis()); - log.info("Rate: {}", tps); - } + increments.incrementAndGet(); } }); } @@ -120,7 +143,8 @@ public class DistributedConsensusLoadTest { public void deactivate(ComponentContext context) { configService.unregisterProperties(getClass(), false); stopTest(); - testExecutor.shutdown(); + runner.shutdown(); + reporter.shutdown(); log.info("Stopped"); } @@ -138,10 +162,10 @@ public class DistributedConsensusLoadTest { } } if (newRate != rate) { - log.info("Rate changed to {}", newRate); + log.info("Per node rate changed to {}", newRate); rate = newRate; stopTest(); - testExecutor.execute(this::startTest); + runner.execute(this::startTest); } } }