ONOS- 2946 Adding ability to view packet processor statistics.

Change-Id: Ic55ec670b197b1ee08f2d11f97658fd614da1614
This commit is contained in:
Thomas Vachuska 2015-09-22 12:11:27 -07:00 committed by Gerrit Code Review
parent 0ae4560f76
commit 924cda424c
5 changed files with 167 additions and 43 deletions

View File

@ -17,7 +17,7 @@ package org.onosproject.cli.net;
import org.apache.karaf.shell.commands.Command; import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand; import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.net.packet.PacketProcessor; import org.onosproject.net.packet.PacketProcessorEntry;
import org.onosproject.net.packet.PacketService; import org.onosproject.net.packet.PacketService;
import static org.onosproject.net.packet.PacketProcessor.ADVISOR_MAX; import static org.onosproject.net.packet.PacketProcessor.ADVISOR_MAX;
@ -30,7 +30,7 @@ import static org.onosproject.net.packet.PacketProcessor.DIRECTOR_MAX;
description = "Lists packet processors") description = "Lists packet processors")
public class PacketProcessorsListCommand extends AbstractShellCommand { public class PacketProcessorsListCommand extends AbstractShellCommand {
private static final String FMT = "priority=%s, class=%s"; private static final String FMT = "priority=%s, class=%s, packets=%d, avgNanos=%d";
@Override @Override
protected void execute() { protected void execute() {
@ -43,8 +43,10 @@ public class PacketProcessorsListCommand extends AbstractShellCommand {
} }
} }
private void print(int priority, PacketProcessor processor) { private void print(PacketProcessorEntry entry) {
print(FMT, priorityFormat(priority), processor.getClass().getName()); print(FMT, priorityFormat(entry.priority()),
entry.processor().getClass().getName(),
entry.invocations(), entry.averageNanos());
} }
private String priorityFormat(int priority) { private String priorityFormat(int priority) {

View File

@ -0,0 +1,58 @@
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.packet;
/**
* Packet processor entry tracking the processor, its priority and
* time consumption.
*/
public interface PacketProcessorEntry {
/**
* Returns the packet processor.
*
* @return packet processor
*/
PacketProcessor processor();
/**
* Returns the packet processor priority.
*
* @return processor priority
*/
int priority();
/**
* Returns the number of invocations.
*
* @return number of invocations
*/
long invocations();
/**
* Returns the total time, in nanoseconds, spent processing packets.
*
* @return total time in nanos
*/
long totalNanos();
/**
* Returns the average time, in nanoseconds, spent processing packets.
*
* @return average time in nanos
*/
long averageNanos();
}

View File

@ -20,7 +20,6 @@ import org.onosproject.core.ApplicationId;
import org.onosproject.net.flow.TrafficSelector; import org.onosproject.net.flow.TrafficSelector;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Service for intercepting data plane packets and for emitting synthetic * Service for intercepting data plane packets and for emitting synthetic
@ -52,13 +51,12 @@ public interface PacketService {
void removeProcessor(PacketProcessor processor); void removeProcessor(PacketProcessor processor);
/** /**
* Returns priority bindings of all registered packet processors. * Returns priority bindings of all registered packet processor entries.
* *
* @return list of existing packet processors * @return list of existing packet processor entries
*/ */
@Beta @Beta
// TODO: Consider returning list of PacketProcessorEntry with processor, priority and stats List<PacketProcessorEntry> getProcessors();
Map<Integer, PacketProcessor> getProcessors();
/** /**
* Requests that packets matching the given selector are punted from the * Requests that packets matching the given selector are punted from the

View File

@ -19,7 +19,6 @@ import org.onosproject.core.ApplicationId;
import org.onosproject.net.flow.TrafficSelector; import org.onosproject.net.flow.TrafficSelector;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Test adapter for packet service. * Test adapter for packet service.
@ -34,7 +33,7 @@ public class PacketServiceAdapter implements PacketService {
} }
@Override @Override
public Map<Integer, PacketProcessor> getProcessors() { public List<PacketProcessorEntry> getProcessors() {
return null; return null;
} }

View File

@ -15,7 +15,8 @@
*/ */
package org.onosproject.net.packet.impl; package org.onosproject.net.packet.impl;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Deactivate;
@ -43,6 +44,7 @@ import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketEvent; import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketPriority; import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.packet.PacketProcessor; import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketProcessorEntry;
import org.onosproject.net.packet.PacketProvider; import org.onosproject.net.packet.PacketProvider;
import org.onosproject.net.packet.PacketProviderRegistry; import org.onosproject.net.packet.PacketProviderRegistry;
import org.onosproject.net.packet.PacketProviderService; import org.onosproject.net.packet.PacketProviderService;
@ -55,8 +57,6 @@ import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -102,18 +102,18 @@ public class PacketManager
private final DeviceListener deviceListener = new InternalDeviceListener(); private final DeviceListener deviceListener = new InternalDeviceListener();
private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>(); private final List<ProcessorEntry> processors = Lists.newCopyOnWriteArrayList();
private ApplicationId appId; private ApplicationId appId;
@Activate @Activate
public void activate() { public void activate() {
eventHandlingExecutor = Executors.newSingleThreadExecutor( eventHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/net/packet", "event-handler")); groupedThreads("onos/net/packet", "event-handler"));
appId = coreService.getAppId(CoreService.CORE_APP_NAME); appId = coreService.getAppId(CoreService.CORE_APP_NAME);
store.setDelegate(delegate); store.setDelegate(delegate);
deviceService.addListener(deviceListener); deviceService.addListener(deviceListener);
// TODO: Should we request packets for all existing devices? I believe we should. store.existingRequests().forEach(this::pushToAllDevices);
log.info("Started"); log.info("Started");
} }
@ -121,6 +121,7 @@ public class PacketManager
public void deactivate() { public void deactivate() {
store.unsetDelegate(delegate); store.unsetDelegate(delegate);
deviceService.removeListener(deviceListener); deviceService.removeListener(deviceListener);
store.existingRequests().forEach(this::removeFromAllDevices);
eventHandlingExecutor.shutdown(); eventHandlingExecutor.shutdown();
log.info("Stopped"); log.info("Stopped");
} }
@ -129,19 +130,35 @@ public class PacketManager
public void addProcessor(PacketProcessor processor, int priority) { public void addProcessor(PacketProcessor processor, int priority) {
checkPermission(PACKET_EVENT); checkPermission(PACKET_EVENT);
checkNotNull(processor, "Processor cannot be null"); checkNotNull(processor, "Processor cannot be null");
processors.put(priority, processor); ProcessorEntry entry = new ProcessorEntry(processor, priority);
// Insert the new processor according to its priority.
int i = 0;
for (; i < processors.size(); i++) {
if (priority < processors.get(i).priority()) {
break;
}
}
processors.add(i, entry);
} }
@Override @Override
public void removeProcessor(PacketProcessor processor) { public void removeProcessor(PacketProcessor processor) {
checkPermission(PACKET_EVENT); checkPermission(PACKET_EVENT);
checkNotNull(processor, "Processor cannot be null"); checkNotNull(processor, "Processor cannot be null");
processors.values().remove(processor);
// Remove the processor entry.
for (int i = 0; i < processors.size(); i++) {
if (processors.get(i).processor() == processor) {
processors.remove(i);
break;
}
}
} }
@Override @Override
public Map<Integer, PacketProcessor> getProcessors() { public List<PacketProcessorEntry> getProcessors() {
return ImmutableMap.copyOf(processors); return ImmutableList.copyOf(processors);
} }
@Override @Override
@ -175,6 +192,18 @@ public class PacketManager
return store.existingRequests(); return store.existingRequests();
} }
/**
* Pushes all rules to the specified device.
*
* @param device device on which to install packet request flows
*/
private void pushRulesToDevice(Device device) {
log.debug("Pushing packet requests to device {}", device.id());
for (PacketRequest request : store.existingRequests()) {
pushRule(device, request);
}
}
/** /**
* Pushes a packet request flow rule to all devices. * Pushes a packet request flow rule to all devices.
* *
@ -187,16 +216,13 @@ public class PacketManager
} }
} }
/** /**
* Removes packet request flow rule from all devices. * Removes packet request flow rule from all devices.
* *
* @param request the packet request * @param request the packet request
*/ */
private void removeFromAllDevices(PacketRequest request) { private void removeFromAllDevices(PacketRequest request) {
for (Device device : deviceService.getDevices()) { deviceService.getAvailableDevices().forEach(d -> removeRule(d, request));
removeRule(device, request);
}
} }
/** /**
@ -232,7 +258,6 @@ public class PacketManager
if (!device.type().equals(Device.Type.SWITCH)) { if (!device.type().equals(Device.Type.SWITCH)) {
return; return;
} }
ForwardingObjective forwarding = createBuilder(request) ForwardingObjective forwarding = createBuilder(request)
.remove(new ObjectiveContext() { .remove(new ObjectiveContext() {
@Override @Override
@ -241,7 +266,6 @@ public class PacketManager
request, device.id(), error); request, device.id(), error);
} }
}); });
objectiveService.forward(device.id(), forwarding); objectiveService.forward(device.id(), forwarding);
} }
@ -263,12 +287,10 @@ public class PacketManager
} }
private void localEmit(OutboundPacket packet) { private void localEmit(OutboundPacket packet) {
final Device device = deviceService.getDevice(packet.sendThrough()); Device device = deviceService.getDevice(packet.sendThrough());
if (device == null) { if (device == null) {
return; return;
} }
PacketProvider packetProvider = getProvider(device.providerId()); PacketProvider packetProvider = getProvider(device.providerId());
if (packetProvider != null) { if (packetProvider != null) {
packetProvider.emit(packet); packetProvider.emit(packet);
@ -280,7 +302,9 @@ public class PacketManager
return new InternalPacketProviderService(provider); return new InternalPacketProviderService(provider);
} }
// Personalized packet provider service issued to the supplied provider. /**
* Personalized packet provider service issued to the supplied provider.
*/
private class InternalPacketProviderService private class InternalPacketProviderService
extends AbstractProviderService<PacketProvider> extends AbstractProviderService<PacketProvider>
implements PacketProviderService { implements PacketProviderService {
@ -292,8 +316,10 @@ public class PacketManager
@Override @Override
public void processPacket(PacketContext context) { public void processPacket(PacketContext context) {
// TODO filter packets sent to processors based on registrations // TODO filter packets sent to processors based on registrations
for (PacketProcessor processor : processors.values()) { for (ProcessorEntry entry : processors) {
processor.process(context); long start = System.nanoTime();
entry.processor().process(context);
entry.addNanos(System.nanoTime() - start);
} }
} }
@ -319,17 +345,14 @@ public class PacketManager
try { try {
Device device = event.subject(); Device device = event.subject();
switch (event.type()) { switch (event.type()) {
case DEVICE_ADDED: case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED: case DEVICE_AVAILABILITY_CHANGED:
if (deviceService.isAvailable(event.subject().id())) { if (deviceService.isAvailable(event.subject().id())) {
log.debug("Pushing packet requests to device {}", event.subject().id()); pushRulesToDevice(device);
for (PacketRequest request : store.existingRequests()) {
pushRule(device, request);
} }
} break;
break; default:
default: break;
break;
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("Failed to process {}", event, e); log.warn("Failed to process {}", event, e);
@ -338,4 +361,48 @@ public class PacketManager
} }
} }
/**
* Entity for tracking stats for a packet processor.
*/
private class ProcessorEntry implements PacketProcessorEntry {
private final PacketProcessor processor;
private final int priority;
private long invocations = 0;
private long nanos = 0;
public ProcessorEntry(PacketProcessor processor, int priority) {
this.processor = processor;
this.priority = priority;
}
@Override
public PacketProcessor processor() {
return processor;
}
@Override
public int priority() {
return priority;
}
@Override
public long invocations() {
return invocations;
}
@Override
public long totalNanos() {
return nanos;
}
@Override
public long averageNanos() {
return invocations > 0 ? nanos / invocations : 0;
}
void addNanos(long nanos) {
this.nanos += nanos;
this.invocations++;
}
}
} }