diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java index bf3f32f1a1..7950949ffa 100644 --- a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java +++ b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java @@ -32,12 +32,22 @@ public interface IntentStore extends Store { long getIntentCount(); /** - * Returns a collection of all intents in the store. + * Returns an iterable of all intents in the store. * - * @return iterable collection of all intents + * @return iterable of all intents */ Iterable getIntents(); + + /** + * Returns an iterable of all intent data objects in the store. + * + * @param localOnly should only intents for which this instance is master + * should be returned + * @return iterable of all intent data objects + */ + Iterable getIntentData(boolean localOnly); + /** * Returns the state of the specified intent. * diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java new file mode 100644 index 0000000000..cafa998ba2 --- /dev/null +++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java @@ -0,0 +1,180 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.net.intent.impl; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Modified; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.onosproject.cfg.ComponentConfigService; +import org.onosproject.net.intent.IntentData; +import org.onosproject.net.intent.IntentEvent; +import org.onosproject.net.intent.IntentListener; +import org.onosproject.net.intent.IntentService; +import org.onosproject.net.intent.IntentStore; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; + +import java.util.Dictionary; +import java.util.Properties; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.onlab.util.Tools.get; +import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.net.intent.IntentState.CORRUPT; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * FIXME Class to cleanup Intents in CORRUPT state. + * FIXME move this to its own file eventually (but need executor for now) + */ +@Component(immediate = true) +public class IntentCleanup implements Runnable, IntentListener { + + private static final Logger log = getLogger(IntentManager.class); + + private static final int DEFAULT_PERIOD = 5; //seconds + + @Property(name = "period", intValue = DEFAULT_PERIOD, + label = "Frequency in ms between cleanup runs") + protected int period = DEFAULT_PERIOD; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected IntentService service; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected IntentStore store; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ComponentConfigService cfgService; + + private ExecutorService executor; + private Timer timer; + private TimerTask timerTask; + + @Activate + public void activate() { + cfgService.registerProperties(getClass()); + executor = newSingleThreadExecutor(groupedThreads("onos/intent", "cleanup")); + timer = new Timer("onos-intent-cleanup-timer"); + service.addListener(this); + adjustRate(); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + cfgService.unregisterProperties(getClass(), false); + service.removeListener(this); + timer.cancel(); + timerTask = null; + executor.shutdown(); + log.info("Stopped"); + } + + @Modified + public void modified(ComponentContext context) { + Dictionary properties = context != null ? context.getProperties() : new Properties(); + + int newPeriod; + try { + String s = get(properties, "period"); + newPeriod = isNullOrEmpty(s) ? period : Integer.parseInt(s.trim()); + } catch (NumberFormatException e) { + log.warn(e.getMessage()); + newPeriod = period; + } + + // Any change in the following parameters implies hard restart + if (newPeriod != period) { + period = newPeriod; + adjustRate(); + } + + log.info("Settings: period={}", period); + } + + private void adjustRate() { + if (timerTask != null) { + timerTask.cancel(); + } + + timerTask = new TimerTask() { + @Override + public void run() { + executor.submit(IntentCleanup.this); + } + }; + + long periodMs = period * 1000; //convert to ms + timer.scheduleAtFixedRate(timerTask, periodMs, periodMs); + } + + + @Override + public void run() { + try { + cleanup(); + } catch (Exception e) { + log.warn("Caught exception during Intent cleanup", e); + } + } + + /** + * Iterate through CORRUPT intents and re-submit/withdraw. + * + * FIXME we want to eventually count number of retries per intent and give up + * FIXME we probably also want to look at intents that have been stuck + * in *_REQ or *ING for "too long". + */ + private void cleanup() { + int count = 0; + for (IntentData intentData : store.getIntentData(true)) { + if (intentData.state() == CORRUPT) { + switch (intentData.request()) { + case INSTALL_REQ: + service.submit(intentData.intent()); + count++; + break; + case WITHDRAW_REQ: + service.withdraw(intentData.intent()); + count++; + break; + default: + //TODO this is an error + break; + } + } + } + log.debug("Intent cleanup ran and resubmitted {} intents", count); + } + + @Override + public void event(IntentEvent event) { + if (event.type() == IntentEvent.Type.CORRUPT) { + // FIXME drop this if we exceed retry threshold + // just run the whole cleanup script for now + executor.submit(this); + } + } +} diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java index 761a9f21a4..bc7e4059bb 100644 --- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java +++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java @@ -132,6 +132,7 @@ public class IntentManager trackerService.unsetDelegate(topoDelegate); eventDispatcher.removeSink(IntentEvent.class); batchExecutor.shutdown(); + workerExecutor.shutdown(); Intent.unbindIdGenerator(idGenerator); log.info("Stopped"); } diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java index 991a10e986..bdb3e8db6a 100644 --- a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java +++ b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java @@ -15,17 +15,11 @@ */ package org.onosproject.net.intent.impl; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; import org.junit.After; @@ -33,6 +27,7 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.onosproject.TestApplicationId; +import org.onosproject.cfg.ComponentConfigAdapter; import org.onosproject.core.ApplicationId; import org.onosproject.core.impl.TestCoreManager; import org.onosproject.event.impl.TestEventDispatcher; @@ -51,18 +46,21 @@ import org.onosproject.net.intent.Key; import org.onosproject.net.resource.LinkResourceAllocations; import org.onosproject.store.trivial.impl.SimpleIntentStore; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.onlab.junit.TestTools.assertAfter; import static org.onlab.util.Tools.delay; import static org.onosproject.net.intent.IntentState.*; @@ -460,14 +458,13 @@ public class IntentManagerTest { * Tests an intent with no installer. */ @Test - @Ignore //FIXME corrupt or failed? public void intentWithoutInstaller() { MockIntent intent = new MockIntent(MockIntent.nextId()); listener.setLatch(1, Type.INSTALL_REQ); - listener.setLatch(1, Type.FAILED); + listener.setLatch(1, Type.CORRUPT); service.submit(intent); listener.await(Type.INSTALL_REQ); - listener.await(Type.FAILED); + listener.await(Type.CORRUPT); verifyState(); } @@ -548,17 +545,64 @@ public class IntentManagerTest { assertThat(flowRuleService.getFlowRuleCount(), is(0)); } + /** + * Test failure to install an intent, then succeed on retry via IntentCleanup. + */ + @Test + public void testCorruptCleanup() { + IntentCleanup cleanup = new IntentCleanup(); + cleanup.service = manager; + cleanup.store = manager.store; + cleanup.cfgService = new ComponentConfigAdapter(); + + try { + cleanup.activate(); + + final TestIntentCompilerMultipleFlows errorCompiler = new TestIntentCompilerMultipleFlows(); + extensionService.registerCompiler(MockIntent.class, errorCompiler); + List intents; + + flowRuleService.setFuture(false); + + intents = Lists.newArrayList(service.getIntents()); + assertThat(intents, hasSize(0)); + + final MockIntent intent1 = new MockIntent(MockIntent.nextId()); + + listener.setLatch(1, Type.INSTALL_REQ); + listener.setLatch(1, Type.CORRUPT); + listener.setLatch(1, Type.INSTALLED); + + service.submit(intent1); + + listener.await(Type.INSTALL_REQ); + listener.await(Type.CORRUPT); + + flowRuleService.setFuture(true); + + listener.await(Type.INSTALLED); + + assertThat(listener.getCounts(Type.CORRUPT), is(1)); + assertThat(listener.getCounts(Type.INSTALLED), is(1)); + assertEquals(INSTALLED, manager.getIntentState(intent1.key())); + assertThat(flowRuleService.getFlowRuleCount(), is(5)); + } finally { + cleanup.deactivate(); + } + } + /** * Tests that an intent that fails installation results in no flows remaining. */ @Test - @Ignore("Cleanup state is not yet implemented in the intent manager") + @Ignore("MockFlowRule numbering issue") //test works if run independently public void testFlowRemovalInstallError() { final TestIntentCompilerMultipleFlows errorCompiler = new TestIntentCompilerMultipleFlows(); extensionService.registerCompiler(MockIntent.class, errorCompiler); List intents; flowRuleService.setFuture(true); + //FIXME relying on "3" is brittle flowRuleService.setErrorFlow(3); intents = Lists.newArrayList(service.getIntents()); @@ -567,13 +611,14 @@ public class IntentManagerTest { final MockIntent intent1 = new MockIntent(MockIntent.nextId()); listener.setLatch(1, Type.INSTALL_REQ); - listener.setLatch(1, Type.FAILED); + listener.setLatch(1, Type.CORRUPT); service.submit(intent1); listener.await(Type.INSTALL_REQ); - listener.await(Type.FAILED); + listener.await(Type.CORRUPT); - assertThat(listener.getCounts(Type.FAILED), is(1)); - assertThat(flowRuleService.getFlowRuleCount(), is(0)); + assertThat(listener.getCounts(Type.CORRUPT), is(1)); + // in this test, there will still be flows abandoned on the data plane + //assertThat(flowRuleService.getFlowRuleCount(), is(0)); } } diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java b/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java index 7211e9d6ab..8bd29bf8ef 100644 --- a/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java +++ b/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java @@ -15,9 +15,7 @@ */ package org.onosproject.net.intent.impl; -import java.util.Set; -import java.util.stream.Collectors; - +import com.google.common.collect.Sets; import org.onosproject.core.ApplicationId; import org.onosproject.net.DeviceId; import org.onosproject.net.flow.DefaultFlowEntry; @@ -26,7 +24,11 @@ import org.onosproject.net.flow.FlowRule; import org.onosproject.net.flow.FlowRuleOperations; import org.onosproject.net.flow.FlowRuleServiceAdapter; -import com.google.common.collect.Sets; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.onosproject.net.flow.FlowRuleOperation.Type.REMOVE; public class MockFlowRuleService extends FlowRuleServiceAdapter { @@ -45,9 +47,10 @@ public class MockFlowRuleService extends FlowRuleServiceAdapter { @Override public void apply(FlowRuleOperations ops) { + AtomicBoolean thisSuccess = new AtomicBoolean(success); ops.stages().forEach(stage -> stage.forEach(flow -> { if (errorFlow == flow.rule().id().value()) { - success = false; + thisSuccess.set(false); } else { switch (flow.type()) { case ADD: @@ -62,7 +65,7 @@ public class MockFlowRuleService extends FlowRuleServiceAdapter { } } })); - if (success) { + if (thisSuccess.get()) { ops.callback().onSuccess(ops); } else { ops.callback().onError(ops); diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java index fc2e8e33cb..4f0ad66066 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java @@ -131,6 +131,16 @@ public class GossipIntentStore .collect(Collectors.toList()); } + @Override + public Iterable getIntentData(boolean localOnly) { + if (localOnly) { + return currentMap.values().stream() + .filter(data -> isMaster(data.key())) + .collect(Collectors.toList()); + } + return currentMap.values(); + } + @Override public IntentState getIntentState(Key intentKey) { IntentData data = currentMap.get(intentKey); diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java index 8fe8a11a44..186fdd91ab 100644 --- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java +++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java @@ -15,6 +15,7 @@ */ package org.onosproject.store.trivial.impl; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; @@ -74,6 +75,16 @@ public class SimpleIntentStore .collect(Collectors.toList()); } + @Override + public Iterable getIntentData(boolean localOnly) { + if (localOnly) { + return current.values().stream() + .filter(data -> isMaster(data.key())) + .collect(Collectors.toList()); + } + return Lists.newArrayList(current.values()); + } + @Override public IntentState getIntentState(Key intentKey) { IntentData data = current.get(intentKey);