diff --git a/cli/src/main/java/org/onosproject/cli/net/PacketProcessorsListCommand.java b/cli/src/main/java/org/onosproject/cli/net/PacketProcessorsListCommand.java index ff66b80381..6b7d93362e 100644 --- a/cli/src/main/java/org/onosproject/cli/net/PacketProcessorsListCommand.java +++ b/cli/src/main/java/org/onosproject/cli/net/PacketProcessorsListCommand.java @@ -17,7 +17,7 @@ package org.onosproject.cli.net; import org.apache.karaf.shell.commands.Command; import org.onosproject.cli.AbstractShellCommand; -import org.onosproject.net.packet.PacketProcessor; +import org.onosproject.net.packet.PacketProcessorEntry; import org.onosproject.net.packet.PacketService; import static org.onosproject.net.packet.PacketProcessor.ADVISOR_MAX; @@ -30,7 +30,7 @@ import static org.onosproject.net.packet.PacketProcessor.DIRECTOR_MAX; description = "Lists packet processors") public class PacketProcessorsListCommand extends AbstractShellCommand { - private static final String FMT = "priority=%s, class=%s"; + private static final String FMT = "priority=%s, class=%s, packets=%d, avgNanos=%d"; @Override protected void execute() { @@ -43,8 +43,10 @@ public class PacketProcessorsListCommand extends AbstractShellCommand { } } - private void print(int priority, PacketProcessor processor) { - print(FMT, priorityFormat(priority), processor.getClass().getName()); + private void print(PacketProcessorEntry entry) { + print(FMT, priorityFormat(entry.priority()), + entry.processor().getClass().getName(), + entry.invocations(), entry.averageNanos()); } private String priorityFormat(int priority) { diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketProcessorEntry.java b/core/api/src/main/java/org/onosproject/net/packet/PacketProcessorEntry.java new file mode 100644 index 0000000000..40386fb79c --- /dev/null +++ b/core/api/src/main/java/org/onosproject/net/packet/PacketProcessorEntry.java @@ -0,0 +1,58 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.net.packet; + +/** + * Packet processor entry tracking the processor, its priority and + * time consumption. + */ +public interface PacketProcessorEntry { + + /** + * Returns the packet processor. + * + * @return packet processor + */ + PacketProcessor processor(); + + /** + * Returns the packet processor priority. + * + * @return processor priority + */ + int priority(); + + /** + * Returns the number of invocations. + * + * @return number of invocations + */ + long invocations(); + + /** + * Returns the total time, in nanoseconds, spent processing packets. + * + * @return total time in nanos + */ + long totalNanos(); + + /** + * Returns the average time, in nanoseconds, spent processing packets. + * + * @return average time in nanos + */ + long averageNanos(); +} diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketService.java b/core/api/src/main/java/org/onosproject/net/packet/PacketService.java index 98f4d8e009..2e7a1b91d0 100644 --- a/core/api/src/main/java/org/onosproject/net/packet/PacketService.java +++ b/core/api/src/main/java/org/onosproject/net/packet/PacketService.java @@ -20,7 +20,6 @@ import org.onosproject.core.ApplicationId; import org.onosproject.net.flow.TrafficSelector; import java.util.List; -import java.util.Map; /** * Service for intercepting data plane packets and for emitting synthetic @@ -52,13 +51,12 @@ public interface PacketService { void removeProcessor(PacketProcessor processor); /** - * Returns priority bindings of all registered packet processors. + * Returns priority bindings of all registered packet processor entries. * - * @return list of existing packet processors + * @return list of existing packet processor entries */ @Beta - // TODO: Consider returning list of PacketProcessorEntry with processor, priority and stats - Map getProcessors(); + List getProcessors(); /** * Requests that packets matching the given selector are punted from the diff --git a/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java b/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java index c438659328..2993ce6bb9 100644 --- a/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java +++ b/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java @@ -19,7 +19,6 @@ import org.onosproject.core.ApplicationId; import org.onosproject.net.flow.TrafficSelector; import java.util.List; -import java.util.Map; /** * Test adapter for packet service. @@ -34,7 +33,7 @@ public class PacketServiceAdapter implements PacketService { } @Override - public Map getProcessors() { + public List getProcessors() { return null; } diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java index a0bc693c89..033798762d 100644 --- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java +++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java @@ -15,7 +15,8 @@ */ package org.onosproject.net.packet.impl; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -43,6 +44,7 @@ import org.onosproject.net.packet.PacketContext; import org.onosproject.net.packet.PacketEvent; import org.onosproject.net.packet.PacketPriority; import org.onosproject.net.packet.PacketProcessor; +import org.onosproject.net.packet.PacketProcessorEntry; import org.onosproject.net.packet.PacketProvider; import org.onosproject.net.packet.PacketProviderRegistry; import org.onosproject.net.packet.PacketProviderService; @@ -55,8 +57,6 @@ import org.onosproject.net.provider.AbstractProviderService; import org.slf4j.Logger; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -102,18 +102,18 @@ public class PacketManager private final DeviceListener deviceListener = new InternalDeviceListener(); - private final Map processors = new ConcurrentHashMap<>(); + private final List processors = Lists.newCopyOnWriteArrayList(); private ApplicationId appId; @Activate public void activate() { eventHandlingExecutor = Executors.newSingleThreadExecutor( - groupedThreads("onos/net/packet", "event-handler")); + groupedThreads("onos/net/packet", "event-handler")); appId = coreService.getAppId(CoreService.CORE_APP_NAME); store.setDelegate(delegate); deviceService.addListener(deviceListener); - // TODO: Should we request packets for all existing devices? I believe we should. + store.existingRequests().forEach(this::pushToAllDevices); log.info("Started"); } @@ -121,6 +121,7 @@ public class PacketManager public void deactivate() { store.unsetDelegate(delegate); deviceService.removeListener(deviceListener); + store.existingRequests().forEach(this::removeFromAllDevices); eventHandlingExecutor.shutdown(); log.info("Stopped"); } @@ -129,19 +130,35 @@ public class PacketManager public void addProcessor(PacketProcessor processor, int priority) { checkPermission(PACKET_EVENT); checkNotNull(processor, "Processor cannot be null"); - processors.put(priority, processor); + ProcessorEntry entry = new ProcessorEntry(processor, priority); + + // Insert the new processor according to its priority. + int i = 0; + for (; i < processors.size(); i++) { + if (priority < processors.get(i).priority()) { + break; + } + } + processors.add(i, entry); } @Override public void removeProcessor(PacketProcessor processor) { checkPermission(PACKET_EVENT); checkNotNull(processor, "Processor cannot be null"); - processors.values().remove(processor); + + // Remove the processor entry. + for (int i = 0; i < processors.size(); i++) { + if (processors.get(i).processor() == processor) { + processors.remove(i); + break; + } + } } @Override - public Map getProcessors() { - return ImmutableMap.copyOf(processors); + public List getProcessors() { + return ImmutableList.copyOf(processors); } @Override @@ -175,6 +192,18 @@ public class PacketManager return store.existingRequests(); } + /** + * Pushes all rules to the specified device. + * + * @param device device on which to install packet request flows + */ + private void pushRulesToDevice(Device device) { + log.debug("Pushing packet requests to device {}", device.id()); + for (PacketRequest request : store.existingRequests()) { + pushRule(device, request); + } + } + /** * Pushes a packet request flow rule to all devices. * @@ -187,16 +216,13 @@ public class PacketManager } } - /** * Removes packet request flow rule from all devices. * * @param request the packet request */ private void removeFromAllDevices(PacketRequest request) { - for (Device device : deviceService.getDevices()) { - removeRule(device, request); - } + deviceService.getAvailableDevices().forEach(d -> removeRule(d, request)); } /** @@ -232,7 +258,6 @@ public class PacketManager if (!device.type().equals(Device.Type.SWITCH)) { return; } - ForwardingObjective forwarding = createBuilder(request) .remove(new ObjectiveContext() { @Override @@ -241,7 +266,6 @@ public class PacketManager request, device.id(), error); } }); - objectiveService.forward(device.id(), forwarding); } @@ -263,12 +287,10 @@ public class PacketManager } private void localEmit(OutboundPacket packet) { - final Device device = deviceService.getDevice(packet.sendThrough()); - + Device device = deviceService.getDevice(packet.sendThrough()); if (device == null) { return; } - PacketProvider packetProvider = getProvider(device.providerId()); if (packetProvider != null) { packetProvider.emit(packet); @@ -280,7 +302,9 @@ public class PacketManager return new InternalPacketProviderService(provider); } - // Personalized packet provider service issued to the supplied provider. + /** + * Personalized packet provider service issued to the supplied provider. + */ private class InternalPacketProviderService extends AbstractProviderService implements PacketProviderService { @@ -292,8 +316,10 @@ public class PacketManager @Override public void processPacket(PacketContext context) { // TODO filter packets sent to processors based on registrations - for (PacketProcessor processor : processors.values()) { - processor.process(context); + for (ProcessorEntry entry : processors) { + long start = System.nanoTime(); + entry.processor().process(context); + entry.addNanos(System.nanoTime() - start); } } @@ -319,17 +345,14 @@ public class PacketManager try { Device device = event.subject(); switch (event.type()) { - case DEVICE_ADDED: - case DEVICE_AVAILABILITY_CHANGED: - if (deviceService.isAvailable(event.subject().id())) { - log.debug("Pushing packet requests to device {}", event.subject().id()); - for (PacketRequest request : store.existingRequests()) { - pushRule(device, request); + case DEVICE_ADDED: + case DEVICE_AVAILABILITY_CHANGED: + if (deviceService.isAvailable(event.subject().id())) { + pushRulesToDevice(device); } - } - break; - default: - break; + break; + default: + break; } } catch (Exception e) { log.warn("Failed to process {}", event, e); @@ -338,4 +361,48 @@ public class PacketManager } } + /** + * Entity for tracking stats for a packet processor. + */ + private class ProcessorEntry implements PacketProcessorEntry { + private final PacketProcessor processor; + private final int priority; + private long invocations = 0; + private long nanos = 0; + + public ProcessorEntry(PacketProcessor processor, int priority) { + this.processor = processor; + this.priority = priority; + } + + @Override + public PacketProcessor processor() { + return processor; + } + + @Override + public int priority() { + return priority; + } + + @Override + public long invocations() { + return invocations; + } + + @Override + public long totalNanos() { + return nanos; + } + + @Override + public long averageNanos() { + return invocations > 0 ? nanos / invocations : 0; + } + + void addNanos(long nanos) { + this.nanos += nanos; + this.invocations++; + } + } }