Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

This commit is contained in:
Ayaka Koshibe 2014-10-02 17:44:21 -07:00
commit bb218b6965
50 changed files with 1356 additions and 211 deletions

View File

@ -3,8 +3,6 @@ package org.onlab.onos.config;
import java.util.List; import java.util.List;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
/** /**
* Represents a set of addresses bound to a port. * Represents a set of addresses bound to a port.
@ -12,8 +10,8 @@ import org.onlab.packet.MacAddress;
public class AddressEntry { public class AddressEntry {
private String dpid; private String dpid;
private short portNumber; private short portNumber;
private List<IpPrefix> ipAddresses; private List<String> ipAddresses;
private MacAddress macAddress; private String macAddress;
public String getDpid() { public String getDpid() {
return dpid; return dpid;
@ -33,21 +31,21 @@ public class AddressEntry {
this.portNumber = portNumber; this.portNumber = portNumber;
} }
public List<IpPrefix> getIpAddresses() { public List<String> getIpAddresses() {
return ipAddresses; return ipAddresses;
} }
@JsonProperty("ips") @JsonProperty("ips")
public void setIpAddresses(List<IpPrefix> ipAddresses) { public void setIpAddresses(List<String> strIps) {
this.ipAddresses = ipAddresses; this.ipAddresses = strIps;
} }
public MacAddress getMacAddress() { public String getMacAddress() {
return macAddress; return macAddress;
} }
@JsonProperty("mac") @JsonProperty("mac")
public void setMacAddress(MacAddress macAddress) { public void setMacAddress(String macAddress) {
this.macAddress = macAddress; this.macAddress = macAddress;
} }
} }

View File

@ -5,6 +5,8 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Component;
@ -17,10 +19,10 @@ import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.host.HostAdminService; import org.onlab.onos.net.host.HostAdminService;
import org.onlab.onos.net.host.PortAddresses; import org.onlab.onos.net.host.PortAddresses;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.slf4j.Logger; import org.slf4j.Logger;
import com.google.common.collect.Sets;
/** /**
* Simple configuration module to read in supplementary network configuration * Simple configuration module to read in supplementary network configuration
* from a file. * from a file.
@ -51,9 +53,29 @@ public class NetworkConfigReader {
DeviceId.deviceId(dpidToUri(entry.getDpid())), DeviceId.deviceId(dpidToUri(entry.getDpid())),
PortNumber.portNumber(entry.getPortNumber())); PortNumber.portNumber(entry.getPortNumber()));
Set<IpPrefix> ipAddresses = new HashSet<IpPrefix>();
for (String strIp : entry.getIpAddresses()) {
try {
IpPrefix address = IpPrefix.valueOf(strIp);
ipAddresses.add(address);
} catch (IllegalArgumentException e) {
log.warn("Bad format for IP address in config: {}", strIp);
}
}
MacAddress macAddress = null;
if (entry.getMacAddress() != null) {
try {
macAddress = MacAddress.valueOf(entry.getMacAddress());
} catch (IllegalArgumentException e) {
log.warn("Bad format for MAC address in config: {}",
entry.getMacAddress());
}
}
PortAddresses addresses = new PortAddresses(cp, PortAddresses addresses = new PortAddresses(cp,
Sets.newHashSet(entry.getIpAddresses()), ipAddresses, macAddress);
entry.getMacAddress());
hostAdminService.bindAddressesToPort(addresses); hostAdminService.bindAddressesToPort(addresses);
} }

View File

@ -38,6 +38,8 @@ import org.slf4j.Logger;
@Component(immediate = true) @Component(immediate = true)
public class ReactiveForwarding { public class ReactiveForwarding {
private static final int TIMEOUT = 10;
private final Logger log = getLogger(getClass()); private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@ -184,15 +186,15 @@ public class ReactiveForwarding {
Ethernet inPkt = context.inPacket().parsed(); Ethernet inPkt = context.inPacket().parsed();
TrafficSelector.Builder builder = new DefaultTrafficSelector.Builder(); TrafficSelector.Builder builder = new DefaultTrafficSelector.Builder();
builder.matchEthType(inPkt.getEtherType()) builder.matchEthType(inPkt.getEtherType())
.matchEthSrc(inPkt.getSourceMAC()) .matchEthSrc(inPkt.getSourceMAC())
.matchEthDst(inPkt.getDestinationMAC()) .matchEthDst(inPkt.getDestinationMAC())
.matchInport(context.inPacket().receivedFrom().port()); .matchInport(context.inPacket().receivedFrom().port());
TrafficTreatment.Builder treat = new DefaultTrafficTreatment.Builder(); TrafficTreatment.Builder treat = new DefaultTrafficTreatment.Builder();
treat.setOutput(portNumber); treat.setOutput(portNumber);
FlowRule f = new DefaultFlowRule(context.inPacket().receivedFrom().deviceId(), FlowRule f = new DefaultFlowRule(context.inPacket().receivedFrom().deviceId(),
builder.build(), treat.build(), 0, appId); builder.build(), treat.build(), 0, appId, TIMEOUT);
flowRuleService.applyFlowRules(f); flowRuleService.applyFlowRules(f);
} }

View File

@ -27,11 +27,12 @@ public class DefaultFlowRule implements FlowRule {
private final ApplicationId appId; private final ApplicationId appId;
private boolean expired; private final int timeout;
public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector, public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector,
TrafficTreatment treatment, int priority, FlowRuleState state, TrafficTreatment treatment, int priority, FlowRuleState state,
long life, long packets, long bytes, long flowId, boolean expired) { long life, long packets, long bytes, long flowId, boolean expired,
int timeout) {
this.deviceId = deviceId; this.deviceId = deviceId;
this.priority = priority; this.priority = priority;
this.selector = selector; this.selector = selector;
@ -39,26 +40,30 @@ public class DefaultFlowRule implements FlowRule {
this.state = state; this.state = state;
this.appId = ApplicationId.valueOf((int) (flowId >> 32)); this.appId = ApplicationId.valueOf((int) (flowId >> 32));
this.id = FlowId.valueOf(flowId); this.id = FlowId.valueOf(flowId);
this.expired = expired;
this.life = life; this.life = life;
this.packets = packets; this.packets = packets;
this.bytes = bytes; this.bytes = bytes;
this.created = System.currentTimeMillis(); this.created = System.currentTimeMillis();
this.timeout = timeout;
} }
public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector, public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector,
TrafficTreatment treatement, int priority, ApplicationId appId) { TrafficTreatment treatement, int priority, ApplicationId appId,
this(deviceId, selector, treatement, priority, FlowRuleState.CREATED, appId); int timeout) {
this(deviceId, selector, treatement, priority,
FlowRuleState.CREATED, appId, timeout);
} }
public DefaultFlowRule(FlowRule rule, FlowRuleState state) { public DefaultFlowRule(FlowRule rule, FlowRuleState state) {
this(rule.deviceId(), rule.selector(), rule.treatment(), this(rule.deviceId(), rule.selector(), rule.treatment(),
rule.priority(), state, rule.id(), rule.appId()); rule.priority(), state, rule.id(), rule.appId(),
rule.timeout());
} }
private DefaultFlowRule(DeviceId deviceId, private DefaultFlowRule(DeviceId deviceId,
TrafficSelector selector, TrafficTreatment treatment, TrafficSelector selector, TrafficTreatment treatment,
int priority, FlowRuleState state, ApplicationId appId) { int priority, FlowRuleState state, ApplicationId appId,
int timeout) {
this.deviceId = deviceId; this.deviceId = deviceId;
this.priority = priority; this.priority = priority;
this.selector = selector; this.selector = selector;
@ -69,13 +74,16 @@ public class DefaultFlowRule implements FlowRule {
this.bytes = 0; this.bytes = 0;
this.appId = appId; this.appId = appId;
this.timeout = timeout;
this.id = FlowId.valueOf((((long) appId().id()) << 32) | (this.hash() & 0xffffffffL)); this.id = FlowId.valueOf((((long) appId().id()) << 32) | (this.hash() & 0xffffffffL));
this.created = System.currentTimeMillis(); this.created = System.currentTimeMillis();
} }
private DefaultFlowRule(DeviceId deviceId, private DefaultFlowRule(DeviceId deviceId,
TrafficSelector selector, TrafficTreatment treatment, TrafficSelector selector, TrafficTreatment treatment,
int priority, FlowRuleState state, FlowId flowId, ApplicationId appId) { int priority, FlowRuleState state, FlowId flowId, ApplicationId appId,
int timeout) {
this.deviceId = deviceId; this.deviceId = deviceId;
this.priority = priority; this.priority = priority;
this.selector = selector; this.selector = selector;
@ -86,6 +94,7 @@ public class DefaultFlowRule implements FlowRule {
this.bytes = 0; this.bytes = 0;
this.appId = appId; this.appId = appId;
this.id = flowId; this.id = flowId;
this.timeout = timeout;
this.created = System.currentTimeMillis(); this.created = System.currentTimeMillis();
} }
@ -149,7 +158,7 @@ public class DefaultFlowRule implements FlowRule {
* @see java.lang.Object#equals(java.lang.Object) * @see java.lang.Object#equals(java.lang.Object)
*/ */
public int hashCode() { public int hashCode() {
return Objects.hash(deviceId, id); return Objects.hash(deviceId, selector, priority);
} }
public int hash() { public int hash() {
@ -170,7 +179,10 @@ public class DefaultFlowRule implements FlowRule {
if (obj instanceof DefaultFlowRule) { if (obj instanceof DefaultFlowRule) {
DefaultFlowRule that = (DefaultFlowRule) obj; DefaultFlowRule that = (DefaultFlowRule) obj;
return Objects.equals(deviceId, that.deviceId) && return Objects.equals(deviceId, that.deviceId) &&
Objects.equals(id, that.id); //Objects.equals(id, that.id) &&
Objects.equals(priority, that.priority) &&
Objects.equals(selector, that.selector);
} }
return false; return false;
} }
@ -181,16 +193,16 @@ public class DefaultFlowRule implements FlowRule {
.add("id", id) .add("id", id)
.add("deviceId", deviceId) .add("deviceId", deviceId)
.add("priority", priority) .add("priority", priority)
.add("selector", selector) .add("selector", selector.criteria())
.add("treatment", treatment) .add("treatment", treatment == null ? "N/A" : treatment.instructions())
.add("created", created) .add("created", created)
.add("state", state) .add("state", state)
.toString(); .toString();
} }
@Override @Override
public boolean expired() { public int timeout() {
return expired; return timeout > MAX_TIMEOUT ? MAX_TIMEOUT : this.timeout;
} }
} }

View File

@ -3,8 +3,9 @@ package org.onlab.onos.net.flow;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.HashSet;
import java.util.List; import java.util.Objects;
import java.util.Set;
import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.criteria.Criteria; import org.onlab.onos.net.flow.criteria.Criteria;
@ -16,22 +17,42 @@ import org.slf4j.Logger;
public final class DefaultTrafficSelector implements TrafficSelector { public final class DefaultTrafficSelector implements TrafficSelector {
private final List<Criterion> selector; private final Set<Criterion> selector;
private DefaultTrafficSelector(List<Criterion> selector) { private DefaultTrafficSelector(Set<Criterion> selector) {
this.selector = Collections.unmodifiableList(selector); this.selector = Collections.unmodifiableSet(selector);
} }
@Override @Override
public List<Criterion> criteria() { public Set<Criterion> criteria() {
return selector; return selector;
} }
@Override
public int hashCode() {
return Objects.hash(selector);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof DefaultTrafficSelector) {
DefaultTrafficSelector that = (DefaultTrafficSelector) obj;
return Objects.equals(selector, that.selector);
}
return false;
}
public static class Builder implements TrafficSelector.Builder { public static class Builder implements TrafficSelector.Builder {
private final Logger log = getLogger(getClass()); private final Logger log = getLogger(getClass());
private final List<Criterion> selector = new LinkedList<>(); private final Set<Criterion> selector = new HashSet<>();
@Override @Override
public Builder add(Criterion criterion) { public Builder add(Criterion criterion) {
@ -39,38 +60,47 @@ public final class DefaultTrafficSelector implements TrafficSelector {
return this; return this;
} }
@Override
public Builder matchInport(PortNumber port) { public Builder matchInport(PortNumber port) {
return add(Criteria.matchInPort(port)); return add(Criteria.matchInPort(port));
} }
@Override
public Builder matchEthSrc(MacAddress addr) { public Builder matchEthSrc(MacAddress addr) {
return add(Criteria.matchEthSrc(addr)); return add(Criteria.matchEthSrc(addr));
} }
@Override
public Builder matchEthDst(MacAddress addr) { public Builder matchEthDst(MacAddress addr) {
return add(Criteria.matchEthDst(addr)); return add(Criteria.matchEthDst(addr));
} }
@Override
public Builder matchEthType(short ethType) { public Builder matchEthType(short ethType) {
return add(Criteria.matchEthType(ethType)); return add(Criteria.matchEthType(ethType));
} }
@Override
public Builder matchVlanId(VlanId vlanId) { public Builder matchVlanId(VlanId vlanId) {
return add(Criteria.matchVlanId(vlanId)); return add(Criteria.matchVlanId(vlanId));
} }
@Override
public Builder matchVlanPcp(Byte vlanPcp) { public Builder matchVlanPcp(Byte vlanPcp) {
return add(Criteria.matchVlanPcp(vlanPcp)); return add(Criteria.matchVlanPcp(vlanPcp));
} }
@Override
public Builder matchIPProtocol(Byte proto) { public Builder matchIPProtocol(Byte proto) {
return add(Criteria.matchIPProtocol(proto)); return add(Criteria.matchIPProtocol(proto));
} }
@Override
public Builder matchIPSrc(IpPrefix ip) { public Builder matchIPSrc(IpPrefix ip) {
return add(Criteria.matchIPSrc(ip)); return add(Criteria.matchIPSrc(ip));
} }
@Override
public Builder matchIPDst(IpPrefix ip) { public Builder matchIPDst(IpPrefix ip) {
return add(Criteria.matchIPDst(ip)); return add(Criteria.matchIPDst(ip));
} }

View File

@ -9,6 +9,7 @@ import org.onlab.onos.net.DeviceId;
*/ */
public interface FlowRule { public interface FlowRule {
static final int MAX_TIMEOUT = 60;
public enum FlowRuleState { public enum FlowRuleState {
/** /**
@ -112,10 +113,9 @@ public interface FlowRule {
long bytes(); long bytes();
/** /**
* Indicates that this flow has expired at the device. * Returns the timeout for this flow requested by an application.
* * @return integer value of the timeout
* @return true if it has expired, false otherwise
*/ */
boolean expired(); int timeout();
} }

View File

@ -8,6 +8,8 @@ import org.onlab.onos.net.provider.Provider;
*/ */
public interface FlowRuleProvider extends Provider { public interface FlowRuleProvider extends Provider {
static final int POLL_INTERVAL = 5;
/** /**
* Instructs the provider to apply the specified flow rules to their * Instructs the provider to apply the specified flow rules to their
* respective devices. * respective devices.

View File

@ -16,27 +16,6 @@ public interface FlowRuleProviderService extends ProviderService<FlowRuleProvide
*/ */
void flowRemoved(FlowRule flowRule); void flowRemoved(FlowRule flowRule);
/**
* Signals that a flow rule is missing for some network traffic.
*
* @param flowRule information about traffic in need of flow rule(s)
*/
void flowMissing(FlowRule flowRule);
/**
* Signals that a flow rule is on the switch but not in the store.
*
* @param flowRule the extra flow rule
*/
void extraneousFlow(FlowRule flowRule);
/**
* Signals that a flow rule was indeed added.
*
* @param flowRule the added flow rule
*/
void flowAdded(FlowRule flowRule);
/** /**
* Pushes the collection of flow entries currently applied on the given * Pushes the collection of flow entries currently applied on the given
* device. * device.

View File

@ -1,6 +1,6 @@
package org.onlab.onos.net.flow; package org.onlab.onos.net.flow;
import java.util.List; import java.util.Set;
import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.criteria.Criterion; import org.onlab.onos.net.flow.criteria.Criterion;
@ -18,7 +18,7 @@ public interface TrafficSelector {
* *
* @return list of criteria * @return list of criteria
*/ */
List<Criterion> criteria(); Set<Criterion> criteria();
/** /**
* Builder of traffic selector entities. * Builder of traffic selector entities.

View File

@ -2,6 +2,8 @@ package org.onlab.onos.net.flow.criteria;
import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.Objects;
import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.criteria.Criterion.Type; import org.onlab.onos.net.flow.criteria.Criterion.Type;
import org.onlab.packet.IpPrefix; import org.onlab.packet.IpPrefix;
@ -137,6 +139,25 @@ public final class Criteria {
return toStringHelper(type().toString()) return toStringHelper(type().toString())
.add("port", port).toString(); .add("port", port).toString();
} }
@Override
public int hashCode() {
return Objects.hash(port);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof PortCriterion) {
PortCriterion that = (PortCriterion) obj;
return Objects.equals(port, that.port);
}
return false;
}
} }
@ -164,6 +185,27 @@ public final class Criteria {
.add("mac", mac).toString(); .add("mac", mac).toString();
} }
@Override
public int hashCode() {
return Objects.hash(mac, type);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof EthCriterion) {
EthCriterion that = (EthCriterion) obj;
return Objects.equals(mac, that.mac) &&
Objects.equals(type, that.type);
}
return false;
}
} }
public static final class EthTypeCriterion implements Criterion { public static final class EthTypeCriterion implements Criterion {
@ -189,6 +231,25 @@ public final class Criteria {
.add("ethType", Long.toHexString(ethType)).toString(); .add("ethType", Long.toHexString(ethType)).toString();
} }
@Override
public int hashCode() {
return Objects.hash(ethType);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof EthTypeCriterion) {
EthTypeCriterion that = (EthTypeCriterion) obj;
return Objects.equals(ethType, that.ethType);
}
return false;
}
} }
@ -217,6 +278,26 @@ public final class Criteria {
.add("ip", ip).toString(); .add("ip", ip).toString();
} }
@Override
public int hashCode() {
return Objects.hash(ip, type);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof IPCriterion) {
IPCriterion that = (IPCriterion) obj;
return Objects.equals(ip, that.ip) &&
Objects.equals(type, that.type);
}
return false;
}
} }
@ -243,6 +324,25 @@ public final class Criteria {
.add("protocol", Long.toHexString(proto)).toString(); .add("protocol", Long.toHexString(proto)).toString();
} }
@Override
public int hashCode() {
return Objects.hash(proto);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof IPProtocolCriterion) {
IPProtocolCriterion that = (IPProtocolCriterion) obj;
return Objects.equals(proto, that.proto);
}
return false;
}
} }
@ -269,6 +369,25 @@ public final class Criteria {
.add("pcp", Long.toHexString(vlanPcp)).toString(); .add("pcp", Long.toHexString(vlanPcp)).toString();
} }
@Override
public int hashCode() {
return Objects.hash(vlanPcp);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof VlanPcpCriterion) {
VlanPcpCriterion that = (VlanPcpCriterion) obj;
return Objects.equals(vlanPcp, that.vlanPcp);
}
return false;
}
} }
@ -296,6 +415,25 @@ public final class Criteria {
.add("id", vlanId).toString(); .add("id", vlanId).toString();
} }
@Override
public int hashCode() {
return Objects.hash(vlanId);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof VlanIdCriterion) {
VlanIdCriterion that = (VlanIdCriterion) obj;
return Objects.equals(vlanId, that.vlanId);
}
return false;
}
} }

View File

@ -5,6 +5,9 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Component;
@ -59,6 +62,8 @@ implements FlowRuleService, FlowRuleProviderRegistry {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService; protected DeviceService deviceService;
private final Map<FlowRule, AtomicInteger> deadRounds = new ConcurrentHashMap<>();
@Activate @Activate
public void activate() { public void activate() {
store.setDelegate(delegate); store.setDelegate(delegate);
@ -84,6 +89,7 @@ implements FlowRuleService, FlowRuleProviderRegistry {
FlowRule f = flowRules[i]; FlowRule f = flowRules[i];
final Device device = deviceService.getDevice(f.deviceId()); final Device device = deviceService.getDevice(f.deviceId());
final FlowRuleProvider frp = getProvider(device.providerId()); final FlowRuleProvider frp = getProvider(device.providerId());
deadRounds.put(f, new AtomicInteger(0));
store.storeFlowRule(f); store.storeFlowRule(f);
frp.applyFlowRule(f); frp.applyFlowRule(f);
} }
@ -98,6 +104,7 @@ implements FlowRuleService, FlowRuleProviderRegistry {
f = flowRules[i]; f = flowRules[i];
device = deviceService.getDevice(f.deviceId()); device = deviceService.getDevice(f.deviceId());
frp = getProvider(device.providerId()); frp = getProvider(device.providerId());
deadRounds.remove(f);
store.deleteFlowRule(f); store.deleteFlowRule(f);
frp.removeFlowRule(f); frp.removeFlowRule(f);
} }
@ -161,11 +168,7 @@ implements FlowRuleService, FlowRuleProviderRegistry {
switch (stored.state()) { switch (stored.state()) {
case ADDED: case ADDED:
case PENDING_ADD: case PENDING_ADD:
if (flowRule.expired()) {
event = store.removeFlowRule(flowRule);
} else {
frp.applyFlowRule(stored); frp.applyFlowRule(stored);
}
break; break;
case PENDING_REMOVE: case PENDING_REMOVE:
case REMOVED: case REMOVED:
@ -181,8 +184,8 @@ implements FlowRuleService, FlowRuleProviderRegistry {
} }
} }
@Override
public void flowMissing(FlowRule flowRule) { private void flowMissing(FlowRule flowRule) {
checkNotNull(flowRule, FLOW_RULE_NULL); checkNotNull(flowRule, FLOW_RULE_NULL);
checkValidity(); checkValidity();
Device device = deviceService.getDevice(flowRule.deviceId()); Device device = deviceService.getDevice(flowRule.deviceId());
@ -209,29 +212,47 @@ implements FlowRuleService, FlowRuleProviderRegistry {
} }
@Override
public void extraneousFlow(FlowRule flowRule) { private void extraneousFlow(FlowRule flowRule) {
checkNotNull(flowRule, FLOW_RULE_NULL); checkNotNull(flowRule, FLOW_RULE_NULL);
checkValidity(); checkValidity();
removeFlowRules(flowRule); removeFlowRules(flowRule);
log.debug("Flow {} is on switch but not in store.", flowRule); log.debug("Flow {} is on switch but not in store.", flowRule);
} }
@Override
public void flowAdded(FlowRule flowRule) { private void flowAdded(FlowRule flowRule) {
checkNotNull(flowRule, FLOW_RULE_NULL); checkNotNull(flowRule, FLOW_RULE_NULL);
checkValidity(); checkValidity();
FlowRuleEvent event = store.addOrUpdateFlowRule(flowRule); if (deadRounds.containsKey(flowRule) &&
if (event == null) { checkRuleLiveness(flowRule, store.getFlowRule(flowRule))) {
log.debug("No flow store event generated.");
FlowRuleEvent event = store.addOrUpdateFlowRule(flowRule);
if (event == null) {
log.debug("No flow store event generated.");
} else {
log.debug("Flow {} {}", flowRule, event.type());
post(event);
}
} else { } else {
log.debug("Flow {} {}", flowRule, event.type()); removeFlowRules(flowRule);
post(event);
} }
} }
private boolean checkRuleLiveness(FlowRule swRule, FlowRule storedRule) {
int timeout = storedRule.timeout();
if (storedRule.packets() != swRule.packets()) {
deadRounds.get(swRule).set(0);
return true;
}
return (deadRounds.get(swRule).getAndIncrement() *
FlowRuleProvider.POLL_INTERVAL) <= timeout;
}
// Posts the specified event to the local event dispatcher. // Posts the specified event to the local event dispatcher.
private void post(FlowRuleEvent event) { private void post(FlowRuleEvent event) {
if (event != null) { if (event != null) {

View File

@ -1,5 +1,10 @@
package org.onlab.onos.net.host.impl; package org.onlab.onos.net.host.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Deactivate;
@ -12,6 +17,7 @@ import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId; import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Host; import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId; import org.onlab.onos.net.HostId;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.host.HostAdminService; import org.onlab.onos.net.host.HostAdminService;
import org.onlab.onos.net.host.HostDescription; import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.host.HostEvent; import org.onlab.onos.net.host.HostEvent;
@ -23,6 +29,7 @@ import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.host.HostStore; import org.onlab.onos.net.host.HostStore;
import org.onlab.onos.net.host.HostStoreDelegate; import org.onlab.onos.net.host.HostStoreDelegate;
import org.onlab.onos.net.host.PortAddresses; import org.onlab.onos.net.host.PortAddresses;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.provider.AbstractProviderRegistry; import org.onlab.onos.net.provider.AbstractProviderRegistry;
import org.onlab.onos.net.provider.AbstractProviderService; import org.onlab.onos.net.provider.AbstractProviderService;
import org.onlab.packet.IpAddress; import org.onlab.packet.IpAddress;
@ -31,11 +38,6 @@ import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId; import org.onlab.packet.VlanId;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
/** /**
* Provides basic implementation of the host SB &amp; NB APIs. * Provides basic implementation of the host SB &amp; NB APIs.
*/ */
@ -59,12 +61,22 @@ public class HostManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher; protected EventDeliveryService eventDispatcher;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
private HostMonitor monitor;
@Activate @Activate
public void activate() { public void activate() {
log.info("Started");
store.setDelegate(delegate); store.setDelegate(delegate);
eventDispatcher.addSink(HostEvent.class, listenerRegistry); eventDispatcher.addSink(HostEvent.class, listenerRegistry);
log.info("Started");
monitor = new HostMonitor(deviceService, packetService, this);
} }
@Deactivate @Deactivate
@ -76,6 +88,8 @@ public class HostManager
@Override @Override
protected HostProviderService createProviderService(HostProvider provider) { protected HostProviderService createProviderService(HostProvider provider) {
monitor.registerHostProvider(provider);
return new InternalHostProviderService(provider); return new InternalHostProviderService(provider);
} }
@ -126,12 +140,12 @@ public class HostManager
@Override @Override
public void startMonitoringIp(IpAddress ip) { public void startMonitoringIp(IpAddress ip) {
// TODO pass through to HostMonitor monitor.addMonitoringFor(ip);
} }
@Override @Override
public void stopMonitoringIp(IpAddress ip) { public void stopMonitoringIp(IpAddress ip) {
// TODO pass through to HostMonitor monitor.stopMonitoring(ip);
} }
@Override @Override

View File

@ -2,10 +2,11 @@ package org.onlab.onos.net.host.impl;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.jboss.netty.util.Timeout; import org.jboss.netty.util.Timeout;
@ -21,19 +22,19 @@ import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.flow.instructions.Instruction; import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.flow.instructions.Instructions; import org.onlab.onos.net.flow.instructions.Instructions;
import org.onlab.onos.net.host.HostProvider; import org.onlab.onos.net.host.HostProvider;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.host.HostStore;
import org.onlab.onos.net.host.PortAddresses; import org.onlab.onos.net.host.PortAddresses;
import org.onlab.onos.net.packet.DefaultOutboundPacket; import org.onlab.onos.net.packet.DefaultOutboundPacket;
import org.onlab.onos.net.packet.OutboundPacket; import org.onlab.onos.net.packet.OutboundPacket;
import org.onlab.onos.net.packet.PacketService; import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.topology.TopologyService; import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.ARP; import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet; import org.onlab.packet.Ethernet;
import org.onlab.packet.IpAddress; import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix; import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress; import org.onlab.packet.MacAddress;
import org.onlab.util.Timer; import org.onlab.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Monitors hosts on the dataplane to detect changes in host data. * Monitors hosts on the dataplane to detect changes in host data.
@ -43,9 +44,7 @@ import org.onlab.util.Timer;
* probe for hosts that have not yet been detected (specified by IP address). * probe for hosts that have not yet been detected (specified by IP address).
*/ */
public class HostMonitor implements TimerTask { public class HostMonitor implements TimerTask {
private static final Logger log = LoggerFactory.getLogger(HostMonitor.class);
private static final byte[] DEFAULT_MAC_ADDRESS =
MacAddress.valueOf("00:00:00:00:00:01").getAddress();
private static final byte[] ZERO_MAC_ADDRESS = private static final byte[] ZERO_MAC_ADDRESS =
MacAddress.valueOf("00:00:00:00:00:00").getAddress(); MacAddress.valueOf("00:00:00:00:00:00").getAddress();
@ -54,59 +53,77 @@ public class HostMonitor implements TimerTask {
private static final byte[] BROADCAST_MAC = private static final byte[] BROADCAST_MAC =
MacAddress.valueOf("ff:ff:ff:ff:ff:ff").getAddress(); MacAddress.valueOf("ff:ff:ff:ff:ff:ff").getAddress();
private final HostService hostService; private DeviceService deviceService;
private final TopologyService topologyService; private PacketService packetService;
private final DeviceService deviceService; private HostManager hostManager;
private final HostProvider hostProvider;
private final PacketService packetService;
private final HostStore hostStore;
private final Set<IpAddress> monitoredAddresses; private final Set<IpAddress> monitoredAddresses;
private final Map<ProviderId, HostProvider> hostProviders;
private final long probeRate; private final long probeRate;
private final Timeout timeout; private final Timeout timeout;
public HostMonitor(HostService hostService, TopologyService topologyService, public HostMonitor(
DeviceService deviceService, DeviceService deviceService,
HostProvider hostProvider, PacketService packetService, PacketService packetService,
HostStore hostStore) { HostManager hostService) {
this.hostService = hostService;
this.topologyService = topologyService;
this.deviceService = deviceService; this.deviceService = deviceService;
this.hostProvider = hostProvider;
this.packetService = packetService; this.packetService = packetService;
this.hostStore = hostStore; this.hostManager = hostService;
monitoredAddresses = new HashSet<>(); monitoredAddresses = new HashSet<>();
hostProviders = new ConcurrentHashMap<>();
probeRate = 30000; // milliseconds probeRate = 30000; // milliseconds
timeout = Timer.getTimer().newTimeout(this, 0, TimeUnit.MILLISECONDS); timeout = Timer.getTimer().newTimeout(this, 0, TimeUnit.MILLISECONDS);
addDefaultAddresses();
} }
public void addMonitoringFor(IpAddress ip) { private void addDefaultAddresses() {
//monitoredAddresses.add(IpAddress.valueOf("10.0.0.1"));
}
void addMonitoringFor(IpAddress ip) {
monitoredAddresses.add(ip); monitoredAddresses.add(ip);
} }
public void stopMonitoring(IpAddress ip) { void stopMonitoring(IpAddress ip) {
monitoredAddresses.remove(ip); monitoredAddresses.remove(ip);
} }
public void shutdown() { void shutdown() {
timeout.cancel(); timeout.cancel();
} }
void registerHostProvider(HostProvider provider) {
hostProviders.put(provider.id(), provider);
}
void unregisterHostProvider(HostProvider provider) {
// TODO find out how to call this
}
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
for (IpAddress ip : monitoredAddresses) { for (IpAddress ip : monitoredAddresses) {
Set<Host> hosts = Collections.emptySet(); //TODO hostService.getHostsByIp(ip); // TODO have to convert right now because the HostService API uses IpPrefix
IpPrefix prefix = IpPrefix.valueOf(ip.toOctets());
Set<Host> hosts = hostManager.getHostsByIp(prefix);
if (hosts.isEmpty()) { if (hosts.isEmpty()) {
sendArpRequest(ip); sendArpRequest(ip);
} else { } else {
for (Host host : hosts) { for (Host host : hosts) {
hostProvider.triggerProbe(host); HostProvider provider = hostProviders.get(host.providerId());
if (provider != null) {
provider.triggerProbe(host);
}
} }
} }
} }
@ -120,29 +137,26 @@ public class HostMonitor implements TimerTask {
* @param targetIp IP address to ARP for * @param targetIp IP address to ARP for
*/ */
private void sendArpRequest(IpAddress targetIp) { private void sendArpRequest(IpAddress targetIp) {
// Find ports with an IP address in the target's subnet and sent ARP // Find ports with an IP address in the target's subnet and sent ARP
// probes out those ports. // probes out those ports.
for (Device device : deviceService.getDevices()) { for (Device device : deviceService.getDevices()) {
for (Port port : deviceService.getPorts(device.id())) { for (Port port : deviceService.getPorts(device.id())) {
ConnectPoint cp = new ConnectPoint(device.id(), port.number()); ConnectPoint cp = new ConnectPoint(device.id(), port.number());
PortAddresses addresses = hostStore.getAddressBindingsForPort(cp); PortAddresses addresses = hostManager.getAddressBindingsForPort(cp);
/*for (IpPrefix prefix : addresses.ips()) { for (IpPrefix prefix : addresses.ips()) {
if (prefix.contains(targetIp)) { if (prefix.contains(targetIp)) {
sendProbe(device.id(), port, addresses, targetIp); sendProbe(device.id(), port, targetIp,
prefix.toIpAddress(), addresses.mac());
} }
}*/ }
} }
} }
// TODO case where no address was found.
// Broadcast out internal edge ports?
} }
private void sendProbe(DeviceId deviceId, Port port, PortAddresses portAddresses, private void sendProbe(DeviceId deviceId, Port port, IpAddress targetIp,
IpAddress targetIp) { IpAddress sourceIp, MacAddress sourceMac) {
Ethernet arpPacket = createArpFor(targetIp, portAddresses); Ethernet arpPacket = buildArpRequest(targetIp, sourceIp, sourceMac);
List<Instruction> instructions = new ArrayList<>(); List<Instruction> instructions = new ArrayList<>();
instructions.add(Instructions.createOutput(port.number())); instructions.add(Instructions.createOutput(port.number()));
@ -158,31 +172,26 @@ public class HostMonitor implements TimerTask {
packetService.emit(outboundPacket); packetService.emit(outboundPacket);
} }
private Ethernet createArpFor(IpAddress targetIp, PortAddresses portAddresses) { private Ethernet buildArpRequest(IpAddress targetIp, IpAddress sourceIp,
MacAddress sourceMac) {
ARP arp = new ARP(); ARP arp = new ARP();
arp.setHardwareType(ARP.HW_TYPE_ETHERNET) arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
.setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH) .setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH)
.setProtocolType(ARP.PROTO_TYPE_IP) .setProtocolType(ARP.PROTO_TYPE_IP)
.setProtocolAddressLength((byte) IpPrefix.INET_LEN); .setProtocolAddressLength((byte) IpPrefix.INET_LEN)
.setOpCode(ARP.OP_REQUEST);
byte[] sourceMacAddress; arp.setSenderHardwareAddress(sourceMac.getAddress())
if (portAddresses.mac() == null) { .setSenderProtocolAddress(sourceIp.toOctets())
sourceMacAddress = DEFAULT_MAC_ADDRESS; .setTargetHardwareAddress(ZERO_MAC_ADDRESS)
} else { .setTargetProtocolAddress(targetIp.toOctets());
sourceMacAddress = portAddresses.mac().getAddress();
}
arp.setSenderHardwareAddress(sourceMacAddress)
//TODO .setSenderProtocolAddress(portAddresses.ips().toOctets())
.setTargetHardwareAddress(ZERO_MAC_ADDRESS)
.setTargetProtocolAddress(targetIp.toOctets());
Ethernet ethernet = new Ethernet(); Ethernet ethernet = new Ethernet();
ethernet.setEtherType(Ethernet.TYPE_ARP) ethernet.setEtherType(Ethernet.TYPE_ARP)
.setDestinationMACAddress(BROADCAST_MAC) .setDestinationMACAddress(BROADCAST_MAC)
.setSourceMACAddress(sourceMacAddress) .setSourceMACAddress(sourceMac.getAddress())
.setPayload(arp); .setPayload(arp);
return ethernet; return ethernet;
} }

View File

@ -9,7 +9,9 @@ import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED; import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -42,6 +44,7 @@ import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId; import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore; import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -52,6 +55,7 @@ public class FlowRuleManagerTest {
private static final ProviderId PID = new ProviderId("of", "foo"); private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID = DeviceId.deviceId("of:001"); private static final DeviceId DID = DeviceId.deviceId("of:001");
private static final int TIMEOUT = 10;
private static final Device DEV = new DefaultDevice( private static final Device DEV = new DefaultDevice(
PID, DID, Type.SWITCH, "", "", "", ""); PID, DID, Type.SWITCH, "", "", "", "");
@ -96,7 +100,7 @@ public class FlowRuleManagerTest {
private FlowRule flowRule(int tsval, int trval) { private FlowRule flowRule(int tsval, int trval) {
TestSelector ts = new TestSelector(tsval); TestSelector ts = new TestSelector(tsval);
TestTreatment tr = new TestTreatment(trval); TestTreatment tr = new TestTreatment(trval);
return new DefaultFlowRule(DID, ts, tr, 0, appId); return new DefaultFlowRule(DID, ts, tr, 0, appId, TIMEOUT);
} }
private FlowRule flowRule(FlowRule rule, FlowRuleState state) { private FlowRule flowRule(FlowRule rule, FlowRuleState state) {
@ -105,7 +109,8 @@ public class FlowRuleManagerTest {
private FlowRule addFlowRule(int hval) { private FlowRule addFlowRule(int hval) {
FlowRule rule = flowRule(hval, hval); FlowRule rule = flowRule(hval, hval);
providerService.flowAdded(rule); service.applyFlowRules(rule);
assertNotNull("rule should be found", service.getFlowEntries(DID)); assertNotNull("rule should be found", service.getFlowEntries(DID));
return rule; return rule;
} }
@ -135,13 +140,18 @@ public class FlowRuleManagerTest {
public void getFlowEntries() { public void getFlowEntries() {
assertTrue("store should be empty", assertTrue("store should be empty",
Sets.newHashSet(service.getFlowEntries(DID)).isEmpty()); Sets.newHashSet(service.getFlowEntries(DID)).isEmpty());
addFlowRule(1); FlowRule f1 = addFlowRule(1);
addFlowRule(2); FlowRule f2 = addFlowRule(2);
assertEquals("2 rules should exist", 2, flowCount()); assertEquals("2 rules should exist", 2, flowCount());
providerService.pushFlowMetrics(DID, ImmutableList.of(f1, f2));
validateEvents(RULE_ADDED, RULE_ADDED); validateEvents(RULE_ADDED, RULE_ADDED);
addFlowRule(1); addFlowRule(1);
assertEquals("should still be 2 rules", 2, flowCount()); assertEquals("should still be 2 rules", 2, flowCount());
providerService.pushFlowMetrics(DID, ImmutableList.of(f1));
validateEvents(RULE_UPDATED); validateEvents(RULE_UPDATED);
} }
@ -179,8 +189,10 @@ public class FlowRuleManagerTest {
public void removeFlowRules() { public void removeFlowRules() {
FlowRule f1 = addFlowRule(1); FlowRule f1 = addFlowRule(1);
FlowRule f2 = addFlowRule(2); FlowRule f2 = addFlowRule(2);
addFlowRule(3); FlowRule f3 = addFlowRule(3);
assertEquals("3 rules should exist", 3, flowCount()); assertEquals("3 rules should exist", 3, flowCount());
providerService.pushFlowMetrics(DID, ImmutableList.of(f1, f2, f3));
validateEvents(RULE_ADDED, RULE_ADDED, RULE_ADDED); validateEvents(RULE_ADDED, RULE_ADDED, RULE_ADDED);
FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED); FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED);
@ -200,8 +212,9 @@ public class FlowRuleManagerTest {
@Test @Test
public void flowRemoved() { public void flowRemoved() {
FlowRule f1 = addFlowRule(1); FlowRule f1 = addFlowRule(1);
FlowRule f2 = addFlowRule(2);
providerService.pushFlowMetrics(f1.deviceId(), ImmutableList.of(f1, f2));
service.removeFlowRules(f1); service.removeFlowRules(f1);
addFlowRule(2);
FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED); FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED);
providerService.flowRemoved(rem1); providerService.flowRemoved(rem1);
validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED); validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
@ -209,9 +222,11 @@ public class FlowRuleManagerTest {
providerService.flowRemoved(rem1); providerService.flowRemoved(rem1);
validateEvents(); validateEvents();
FlowRule f3 = flowRule(flowRule(3, 3), FlowRuleState.ADDED); FlowRule f3 = flowRule(3, 3);
providerService.flowAdded(f3); service.applyFlowRules(f3);
providerService.pushFlowMetrics(f3.deviceId(), Collections.singletonList(f3));
validateEvents(RULE_ADDED); validateEvents(RULE_ADDED);
providerService.flowRemoved(f3); providerService.flowRemoved(f3);
validateEvents(); validateEvents();
} }
@ -223,9 +238,10 @@ public class FlowRuleManagerTest {
FlowRule f3 = flowRule(3, 3); FlowRule f3 = flowRule(3, 3);
mgr.applyFlowRules(f1, f2, f3);
FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED); FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED); FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
mgr.applyFlowRules(f1, f2, f3);
providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2)); providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2));
@ -233,7 +249,7 @@ public class FlowRuleManagerTest {
validateState(FlowRuleState.PENDING_ADD, FlowRuleState.ADDED, validateState(FlowRuleState.PENDING_ADD, FlowRuleState.ADDED,
FlowRuleState.ADDED)); FlowRuleState.ADDED));
validateEvents(RULE_UPDATED, RULE_UPDATED); validateEvents(RULE_ADDED, RULE_ADDED);
} }
@Test @Test
@ -241,15 +257,15 @@ public class FlowRuleManagerTest {
FlowRule f1 = flowRule(1, 1); FlowRule f1 = flowRule(1, 1);
FlowRule f2 = flowRule(2, 2); FlowRule f2 = flowRule(2, 2);
FlowRule f3 = flowRule(3, 3); FlowRule f3 = flowRule(3, 3);
mgr.applyFlowRules(f1, f2);
FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED); FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED); FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
FlowRule updatedF3 = flowRule(f3, FlowRuleState.ADDED); FlowRule updatedF3 = flowRule(f3, FlowRuleState.ADDED);
mgr.applyFlowRules(f1, f2);
providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2, updatedF3)); providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2, updatedF3));
validateEvents(RULE_UPDATED, RULE_UPDATED); validateEvents(RULE_ADDED, RULE_ADDED);
} }
@ -271,7 +287,7 @@ public class FlowRuleManagerTest {
providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2)); providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2));
validateEvents(RULE_UPDATED, RULE_UPDATED, RULE_REMOVED); validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
} }
@ -386,7 +402,7 @@ public class FlowRuleManagerTest {
} }
@Override @Override
public List<Criterion> criteria() { public Set<Criterion> criteria() {
return null; return null;
} }

View File

@ -20,7 +20,7 @@ import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId; import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore; import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService; import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager; import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
import org.onlab.packet.IpPrefix; import org.onlab.packet.IpPrefix;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -50,7 +50,7 @@ public class DistributedClusterStore
private final Map<NodeId, State> states = new ConcurrentHashMap<>(); private final Map<NodeId, State> states = new ConcurrentHashMap<>();
private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder() private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
.maximumSize(1000) .maximumSize(1000)
.expireAfterWrite(OnosClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS) .expireAfterWrite(ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
.removalListener(new LivenessCacheRemovalListener()).build(); .removalListener(new LivenessCacheRemovalListener()).build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

View File

@ -1,5 +1,13 @@
package org.onlab.onos.store.cluster.messaging; package org.onlab.onos.store.cluster.messaging;
/**
* Interface for handling cluster messages.
*/
public interface ClusterMessageHandler { public interface ClusterMessageHandler {
/**
* Handles/Processes the cluster message.
* @param message cluster message.
*/
public void handle(ClusterMessage message); public void handle(ClusterMessage message);
} }

View File

@ -2,6 +2,8 @@ package org.onlab.onos.store.cluster.messaging;
/** /**
* Representation of a message subject. * Representation of a message subject.
* Cluster messages have associated subjects that dictate how they get handled
* on the receiving side.
*/ */
public class MessageSubject { public class MessageSubject {

View File

@ -1,18 +0,0 @@
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**
* Represents a message consumer.
*/
public interface MessageSubscriber {
/**
* Receives the specified cluster message.
*
* @param message message to be received
* @param fromNodeId node from which the message was received
*/
void receive(Object messagePayload, NodeId fromNodeId);
}

View File

@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
@Component(immediate = true) @Component(immediate = true)
@Service @Service
public class OnosClusterCommunicationManager public class ClusterCommunicationManager
implements ClusterCommunicationService, ClusterCommunicationAdminService { implements ClusterCommunicationService, ClusterCommunicationAdminService {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());

View File

@ -6,7 +6,7 @@ import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode; import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId; import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager; import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
import org.onlab.netty.NettyMessagingService; import org.onlab.netty.NettyMessagingService;
import org.onlab.packet.IpPrefix; import org.onlab.packet.IpPrefix;
@ -29,8 +29,8 @@ public class ClusterCommunicationManagerTest {
private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1"); private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
private OnosClusterCommunicationManager ccm1; private ClusterCommunicationManager ccm1;
private OnosClusterCommunicationManager ccm2; private ClusterCommunicationManager ccm2;
private TestDelegate cnd1 = new TestDelegate(); private TestDelegate cnd1 = new TestDelegate();
private TestDelegate cnd2 = new TestDelegate(); private TestDelegate cnd2 = new TestDelegate();
@ -46,11 +46,11 @@ public class ClusterCommunicationManagerTest {
NettyMessagingService messagingService = new NettyMessagingService(); NettyMessagingService messagingService = new NettyMessagingService();
messagingService.activate(); messagingService.activate();
ccm1 = new OnosClusterCommunicationManager(); ccm1 = new ClusterCommunicationManager();
// ccm1.serializationService = messageSerializer; // ccm1.serializationService = messageSerializer;
ccm1.activate(); ccm1.activate();
ccm2 = new OnosClusterCommunicationManager(); ccm2 = new ClusterCommunicationManager();
// ccm2.serializationService = messageSerializer; // ccm2.serializationService = messageSerializer;
ccm2.activate(); ccm2.activate();

View File

@ -1,6 +1,5 @@
package org.onlab.onos.store.trivial.impl; package org.onlab.onos.store.trivial.impl;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@ -116,18 +115,21 @@ public class SimpleFlowRuleStore
DeviceId did = rule.deviceId(); DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry // check if this new rule is an update to an existing entry
if (flowEntries.containsEntry(did, rule)) { FlowRule stored = getFlowRule(rule);
//synchronized (flowEntries) { if (stored != null) {
// Multimaps support duplicates so we have to remove our rule // Multimaps support duplicates so we have to remove our rule
// and replace it with the current version. // and replace it with the current version.
flowEntries.remove(did, rule); flowEntries.remove(did, rule);
flowEntries.put(did, rule); flowEntries.put(did, rule);
//}
if (stored.state() == FlowRuleState.PENDING_ADD) {
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule); return new FlowRuleEvent(Type.RULE_UPDATED, rule);
} }
flowEntries.put(did, rule); flowEntries.put(did, rule);
return new FlowRuleEvent(RULE_ADDED, rule); return null;
} }
@Override @Override
@ -140,11 +142,4 @@ public class SimpleFlowRuleStore
} }
//} //}
} }
} }

View File

@ -11,7 +11,7 @@
<bundle>mvn:io.netty/netty/3.9.2.Final</bundle> <bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
<bundle>mvn:com.hazelcast/hazelcast/3.3</bundle> <bundle>mvn:com.hazelcast/hazelcast/3.3</bundle>
<bundle>mvn:com.codahale.metrics/metrics-core/3.0.2</bundle> <bundle>mvn:io.dropwizard.metrics/metrics-core/3.1.0</bundle>
<bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle> <bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle>
<bundle>mvn:com.esotericsoftware.kryo/kryo/2.24.0</bundle> <bundle>mvn:com.esotericsoftware.kryo/kryo/2.24.0</bundle>

View File

@ -169,7 +169,12 @@ public class OpenFlowControllerImpl implements OpenFlowController {
@Override @Override
public void setRole(Dpid dpid, RoleState role) { public void setRole(Dpid dpid, RoleState role) {
getSwitch(dpid).setRole(role); final OpenFlowSwitch sw = getSwitch(dpid);
if (sw == null) {
log.debug("Switch not connected. Ignoring setRole({}, {})", dpid, role);
return;
}
sw.setRole(role);
} }
/** /**

View File

@ -248,6 +248,11 @@
<classifier>tests</classifier> <classifier>tests</classifier>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>

View File

@ -77,7 +77,6 @@ public class FlowModBuilder {
.setCookie(U64.of(cookie.value())) .setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER) .setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions) .setActions(actions)
.setIdleTimeout(10)
.setMatch(match) .setMatch(match)
.setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM)) .setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
.setPriority(priority) .setPriority(priority)

View File

@ -71,7 +71,7 @@ public class FlowRuleBuilder {
buildSelector(), buildTreatment(), stat.getPriority(), buildSelector(), buildTreatment(), stat.getPriority(),
FlowRuleState.ADDED, stat.getDurationNsec() / 1000000, FlowRuleState.ADDED, stat.getDurationNsec() / 1000000,
stat.getPacketCount().getValue(), stat.getByteCount().getValue(), stat.getPacketCount().getValue(), stat.getByteCount().getValue(),
stat.getCookie().getValue(), false); stat.getCookie().getValue(), false, stat.getIdleTimeout());
} else { } else {
// TODO: revisit potentially. // TODO: revisit potentially.
return new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)), return new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
@ -79,7 +79,8 @@ public class FlowRuleBuilder {
FlowRuleState.REMOVED, removed.getDurationNsec() / 1000000, FlowRuleState.REMOVED, removed.getDurationNsec() / 1000000,
removed.getPacketCount().getValue(), removed.getByteCount().getValue(), removed.getPacketCount().getValue(), removed.getByteCount().getValue(),
removed.getCookie().getValue(), removed.getCookie().getValue(),
removed.getReason() == OFFlowRemovedReason.IDLE_TIMEOUT.ordinal()); removed.getReason() == OFFlowRemovedReason.IDLE_TIMEOUT.ordinal(),
stat.getIdleTimeout());
} }
} }

View File

@ -127,7 +127,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Override @Override
public void switchAdded(Dpid dpid) { public void switchAdded(Dpid dpid) {
FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), 5); FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), POLL_INTERVAL);
fsc.start(); fsc.start();
collectors.put(dpid, fsc); collectors.put(dpid, fsc);
} }

View File

@ -176,7 +176,7 @@
</module> </module>
<module name="ParameterNumber"> <module name="ParameterNumber">
<property name="max" value="10"/> <property name="max" value="15"/>
<property name="tokens" value="CTOR_DEF"/> <property name="tokens" value="CTOR_DEF"/>
</module> </module>
<!-- Checks for whitespace --> <!-- Checks for whitespace -->

View File

@ -56,9 +56,13 @@
<artifactId>objenesis</artifactId> <artifactId>objenesis</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.codahale.metrics</groupId> <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId> <artifactId>metrics-core</artifactId>
<version>3.0.2</version> <version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -1,10 +1,18 @@
package org.onlab.metrics; package org.onlab.metrics;
import java.io.File;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Gauge; import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram; import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
@ -45,24 +53,44 @@ import com.codahale.metrics.Timer;
* </code> * </code>
* </pre> * </pre>
*/ */
@Component(immediate = true)
public final class MetricsManager implements MetricsService { public final class MetricsManager implements MetricsService {
/** /**
* Registry to hold the Components defined in the system. * Registry to hold the Components defined in the system.
*/ */
private ConcurrentMap<String, MetricsComponent> componentsRegistry = private ConcurrentMap<String, MetricsComponent> componentsRegistry;
new ConcurrentHashMap<>();
/** /**
* Registry for the Metrics objects created in the system. * Registry for the Metrics objects created in the system.
*/ */
private final MetricRegistry metricsRegistry = new MetricRegistry(); private final MetricRegistry metricsRegistry;
/** /**
* Hide constructor. The only way to get the registry is through the * Default Reporter for this metrics manager.
* singleton getter.
*/ */
private MetricsManager() {} private final CsvReporter reporter;
public MetricsManager() {
this.componentsRegistry = new ConcurrentHashMap<>();
this.metricsRegistry = new MetricRegistry();
this.reporter = CsvReporter.forRegistry(metricsRegistry)
.formatFor(Locale.US)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MICROSECONDS)
.build(new File("/tmp/"));
reporter.start(10, TimeUnit.SECONDS);
}
@Activate
public void activate() {
}
@Deactivate
public void deactivate() {
}
/** /**
* Registers a component. * Registers a component.

View File

@ -250,6 +250,17 @@ public final class IpPrefix {
return new IpPrefix(version, host, netmask); return new IpPrefix(version, host, netmask);
} }
/**
* Returns an IpAddress of the bytes contained in this prefix.
* FIXME this is a hack for now and only works because IpPrefix doesn't
* mask the input bytes on creation.
*
* @return the IpAddress
*/
public IpAddress toIpAddress() {
return IpAddress.valueOf(octets);
}
public boolean isMasked() { public boolean isMasked() {
return mask() != 0; return mask() != 0;
} }
@ -278,6 +289,17 @@ public final class IpPrefix {
return false; return false;
} }
public boolean contains(IpAddress address) {
// Need to get the network address because prefixes aren't automatically
// masked on creation
IpPrefix meMasked = network();
IpPrefix otherMasked =
IpPrefix.valueOf(address.octets, netmask).network();
return Arrays.equals(meMasked.octets, otherMasked.octets);
}
@Override @Override
public int hashCode() { public int hashCode() {
final int prime = 31; final int prime = 31;
@ -303,6 +325,7 @@ public final class IpPrefix {
if (netmask != other.netmask) { if (netmask != other.netmask) {
return false; return false;
} }
// TODO not quite right until we mask the input
if (!Arrays.equals(octets, other.octets)) { if (!Arrays.equals(octets, other.octets)) {
return false; return false;
} }

View File

@ -76,7 +76,7 @@ public class IpPrefixTest {
} }
@Test @Test
public void testContains() { public void testContainsIpPrefix() {
IpPrefix slash31 = IpPrefix.valueOf(BYTES1, 31); IpPrefix slash31 = IpPrefix.valueOf(BYTES1, 31);
IpPrefix slash32 = IpPrefix.valueOf(BYTES1, 32); IpPrefix slash32 = IpPrefix.valueOf(BYTES1, 32);
IpPrefix differentSlash32 = IpPrefix.valueOf(BYTES2, 32); IpPrefix differentSlash32 = IpPrefix.valueOf(BYTES2, 32);
@ -96,4 +96,17 @@ public class IpPrefixTest {
assertTrue(slash8.contains(slash31)); assertTrue(slash8.contains(slash31));
assertFalse(slash31.contains(slash8)); assertFalse(slash31.contains(slash8));
} }
@Test
public void testContainsIpAddress() {
IpPrefix slash31 = IpPrefix.valueOf(BYTES1, 31);
IpAddress slash32 = IpAddress.valueOf(BYTES1, 32);
assertTrue(slash31.contains(slash32));
IpPrefix intf = IpPrefix.valueOf("192.168.10.101/24");
IpAddress addr = IpAddress.valueOf("192.168.10.1");
assertTrue(intf.contains(addr));
}
} }

View File

@ -42,7 +42,6 @@
<dependency> <dependency>
<groupId>commons-pool</groupId> <groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,68 @@
package org.onlab.netty;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* An asynchronous response.
* This class provides a base implementation of Response, with methods to retrieve the
* result and query to see if the result is ready. The result can only be retrieved when
* it is ready and the get methods will block if the result is not ready yet.
* @param <T> type of response.
*/
public class AsyncResponse<T> implements Response<T> {
private T value;
private boolean done = false;
private final long start = System.nanoTime();
@Override
public T get(long timeout, TimeUnit tu) throws TimeoutException {
timeout = tu.toNanos(timeout);
boolean interrupted = false;
try {
synchronized (this) {
while (!done) {
try {
long timeRemaining = timeout - (System.nanoTime() - start);
if (timeRemaining <= 0) {
throw new TimeoutException("Operation timed out.");
}
TimeUnit.NANOSECONDS.timedWait(this, timeRemaining);
} catch (InterruptedException e) {
interrupted = true;
}
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
return value;
}
@Override
public T get() throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public boolean isReady() {
return done;
}
/**
* Sets response value and unblocks any thread blocking on the response to become
* available.
* @param data response data.
*/
@SuppressWarnings("unchecked")
public synchronized void setResponse(Object data) {
if (!done) {
done = true;
value = (T) data;
this.notifyAll();
}
}
}

View File

@ -0,0 +1,15 @@
package org.onlab.netty;
import java.io.IOException;
/**
* Message handler that echos the message back to the sender.
*/
public class EchoHandler implements MessageHandler {
@Override
public void handle(Message message) throws IOException {
System.out.println("Received: " + message.payload() + ". Echoing it back to the sender.");
message.respond(message.payload());
}
}

View File

@ -0,0 +1,62 @@
package org.onlab.netty;
/**
* Representation of a TCP/UDP communication end point.
*/
public class Endpoint {
private final int port;
private final String host;
public Endpoint(String host, int port) {
this.host = host;
this.port = port;
}
public String host() {
return host;
}
public int port() {
return port;
}
@Override
public String toString() {
return "Endpoint [port=" + port + ", host=" + host + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + port;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Endpoint other = (Endpoint) obj;
if (host == null) {
if (other.host != null) {
return false;
}
} else if (!host.equals(other.host)) {
return false;
}
if (port != other.port) {
return false;
}
return true;
}
}

View File

@ -0,0 +1,85 @@
package org.onlab.netty;
import java.io.IOException;
/**
* Internal message representation with additional attributes
* for supporting, synchronous request/reply behavior.
*/
public final class InternalMessage implements Message {
private long id;
private Endpoint sender;
private String type;
private Object payload;
private transient NettyMessagingService messagingService;
public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
// Must be created using the Builder.
private InternalMessage() {}
public long id() {
return id;
}
public String type() {
return type;
}
public Endpoint sender() {
return sender;
}
@Override
public Object payload() {
return payload;
}
@Override
public void respond(Object data) throws IOException {
Builder builder = new Builder(messagingService);
InternalMessage message = builder.withId(this.id)
// FIXME: Sender should be messagingService.localEp.
.withSender(this.sender)
.withPayload(data)
.withType(REPLY_MESSAGE_TYPE)
.build();
messagingService.sendAsync(sender, message);
}
/**
* Builder for InternalMessages.
*/
public static class Builder {
private InternalMessage message;
public Builder(NettyMessagingService messagingService) {
message = new InternalMessage();
message.messagingService = messagingService;
}
public Builder withId(long id) {
message.id = id;
return this;
}
public Builder withType(String type) {
message.type = type;
return this;
}
public Builder withSender(Endpoint sender) {
message.sender = sender;
return this;
}
public Builder withPayload(Object payload) {
message.payload = payload;
return this;
}
public InternalMessage build() {
return message;
}
}
}

View File

@ -0,0 +1,47 @@
package org.onlab.netty;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
/**
* Kryo Serializer.
*/
public class KryoSerializer implements Serializer {
private final Logger log = LoggerFactory.getLogger(getClass());
private KryoPool serializerPool;
public KryoSerializer() {
setupKryoPool();
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ArrayList.class
)
.build()
.populate(1);
}
@Override
public Object decode(byte[] data) {
return serializerPool.deserialize(data);
}
@Override
public byte[] encode(Object payload) {
return serializerPool.serialize(payload);
}
}

View File

@ -0,0 +1,12 @@
package org.onlab.netty;
/**
* A MessageHandler that simply logs the information.
*/
public class LoggingHandler implements MessageHandler {
@Override
public void handle(Message message) {
System.out.println("Received: " + message.payload());
}
}

View File

@ -0,0 +1,23 @@
package org.onlab.netty;
import java.io.IOException;
/**
* A unit of communication.
* Has a payload. Also supports a feature to respond back to the sender.
*/
public interface Message {
/**
* Returns the payload of this message.
* @return message payload.
*/
public Object payload();
/**
* Sends a reply back to the sender of this messge.
* @param data payload of the response.
* @throws IOException if there is a communication error.
*/
public void respond(Object data) throws IOException;
}

View File

@ -0,0 +1,58 @@
package org.onlab.netty;
import java.util.Arrays;
import java.util.List;
import static com.google.common.base.Preconditions.checkState;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
/**
* Decode bytes into a InternalMessage.
*/
public class MessageDecoder extends ByteToMessageDecoder {
private final NettyMessagingService messagingService;
private final Serializer serializer;
public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
this.messagingService = messagingService;
this.serializer = serializer;
}
@Override
protected void decode(ChannelHandlerContext context, ByteBuf in,
List<Object> messages) throws Exception {
byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array();
checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
// read message Id.
long id = in.readLong();
// read message type; first read size and then bytes.
String type = new String(in.readBytes(in.readInt()).array());
// read sender host name; first read size and then bytes.
String host = new String(in.readBytes(in.readInt()).array());
// read sender port.
int port = in.readInt();
Endpoint sender = new Endpoint(host, port);
// read message payload; first read size and then bytes.
Object payload = serializer.decode(in.readBytes(in.readInt()).array());
InternalMessage message = new InternalMessage.Builder(messagingService)
.withId(id)
.withSender(sender)
.withType(type)
.withPayload(payload)
.build();
messages.add(message);
}
}

View File

@ -0,0 +1,60 @@
package org.onlab.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* Encode InternalMessage out into a byte buffer.
*/
public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
private final Serializer serializer;
public MessageEncoder(Serializer serializer) {
this.serializer = serializer;
}
@Override
protected void encode(ChannelHandlerContext context, InternalMessage message,
ByteBuf out) throws Exception {
// write preamble
out.writeBytes(PREAMBLE);
// write id
out.writeLong(message.id());
// write type length
out.writeInt(message.type().length());
// write type
out.writeBytes(message.type().getBytes());
// write sender host name size
out.writeInt(message.sender().host().length());
// write sender host name.
out.writeBytes(message.sender().host().getBytes());
// write port
out.writeInt(message.sender().port());
try {
serializer.encode(message.payload());
} catch (Exception e) {
e.printStackTrace();
}
byte[] payload = serializer.encode(message.payload());
// write payload length.
out.writeInt(payload.length);
// write payload bytes
out.writeBytes(payload);
}
}

View File

@ -0,0 +1,16 @@
package org.onlab.netty;
import java.io.IOException;
/**
* Handler for a message.
*/
public interface MessageHandler {
/**
* Handles the message.
* @param message message.
* @throws IOException.
*/
public void handle(Message message) throws IOException;
}

View File

@ -0,0 +1,41 @@
package org.onlab.netty;
import java.io.IOException;
/**
* Interface for low level messaging primitives.
*/
public interface MessagingService {
/**
* Sends a message asynchronously to the specified communication end point.
* The message is specified using the type and payload.
* @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload.
* @throws IOException
*/
public void sendAsync(Endpoint ep, String type, Object payload) throws IOException;
/**
* Sends a message synchronously and waits for a response.
* @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload.
* @return a response future
* @throws IOException
*/
public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException;
/**
* Registers a new message handler for message type.
* @param type message type.
* @param handler message handler
*/
public void registerHandler(String type, MessageHandler handler);
/**
* Unregister current handler, if one exists for message type.
* @param type message type
*/
public void unregisterHandler(String type);
}

View File

@ -0,0 +1,244 @@
package org.onlab.netty;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.pool.KeyedObjectPool;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* A Netty based implementation of MessagingService.
*/
public class NettyMessagingService implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private KeyedObjectPool<Endpoint, Channel> channels =
new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
private final int port;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private Cache<Long, AsyncResponse<?>> responseFutures;
private final Endpoint localEp;
protected Serializer serializer;
public NettyMessagingService() {
// TODO: Default port should be configurable.
this(8080);
}
// FIXME: Constructor should not throw exceptions.
public NettyMessagingService(int port) {
this.port = port;
try {
localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
} catch (UnknownHostException e) {
// bailing out.
throw new RuntimeException(e);
}
}
public void activate() throws Exception {
responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
startAcceptingConnections();
}
public void deactivate() throws Exception {
channels.close();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
@Override
public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this)
.withId(RandomUtils.nextLong())
.withSender(localEp)
.withType(type)
.withPayload(payload)
.build();
sendAsync(ep, message);
}
protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
Channel channel = null;
try {
channel = channels.borrowObject(ep);
channel.eventLoop().execute(new WriteTask(channel, message));
} catch (Exception e) {
throw new IOException(e);
} finally {
try {
channels.returnObject(ep, channel);
} catch (Exception e) {
log.warn("Error returning object back to the pool", e);
// ignored.
}
}
}
@Override
public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
throws IOException {
AsyncResponse<T> futureResponse = new AsyncResponse<T>();
Long messageId = RandomUtils.nextLong();
responseFutures.put(messageId, futureResponse);
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
.withType(type)
.withPayload(payload)
.build();
sendAsync(ep, message);
return futureResponse;
}
@Override
public void registerHandler(String type, MessageHandler handler) {
// TODO: Is this the right semantics for handler registration?
handlers.putIfAbsent(type, handler);
}
public void unregisterHandler(String type) {
handlers.remove(type);
}
private MessageHandler getMessageHandler(String type) {
return handlers.get(type);
}
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new OnosCommunicationChannelInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
b.bind(port).sync();
}
private class OnosCommunicationChannelFactory
implements KeyedPoolableObjectFactory<Endpoint, Channel> {
@Override
public void activateObject(Endpoint endpoint, Channel channel)
throws Exception {
}
@Override
public void destroyObject(Endpoint ep, Channel channel) throws Exception {
channel.close();
}
@Override
public Channel makeObject(Endpoint ep) throws Exception {
Bootstrap b = new Bootstrap();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(workerGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new OnosCommunicationChannelInitializer());
// Start the client.
ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
return f.channel();
}
@Override
public void passivateObject(Endpoint ep, Channel channel)
throws Exception {
}
@Override
public boolean validateObject(Endpoint ep, Channel channel) {
return channel.isOpen();
}
}
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new MessageEncoder(serializer))
.addLast(new MessageDecoder(NettyMessagingService.this, serializer))
.addLast(new NettyMessagingService.InboundMessageDispatcher());
}
}
private class WriteTask implements Runnable {
private final Object message;
private final Channel channel;
public WriteTask(Channel channel, Object message) {
this.message = message;
this.channel = channel;
}
@Override
public void run() {
channel.writeAndFlush(message);
}
}
private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
try {
AsyncResponse<?> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.setResponse(message.payload());
}
log.warn("Received a reply. But was unable to locate the request handle");
} finally {
NettyMessagingService.this.responseFutures.invalidate(message.id());
}
return;
}
MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
handler.handle(message);
}
}
}

View File

@ -0,0 +1,36 @@
package org.onlab.netty;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Response object returned when making synchronous requests.
* Can you used to check is a response is ready and/or wait for a response
* to become available.
*
* @param <T> type of response.
*/
public interface Response<T> {
/**
* Gets the response waiting for a designated timeout period.
* @param timeout timeout period (since request was sent out)
* @param tu unit of time.
* @return response
* @throws TimeoutException if the timeout expires before the response arrives.
*/
public T get(long timeout, TimeUnit tu) throws TimeoutException;
/**
* Gets the response waiting for indefinite timeout period.
* @return response
* @throws InterruptedException if the thread is interrupted before the response arrives.
*/
public T get() throws InterruptedException;
/**
* Checks if the response is ready without blocking.
* @return true if response is ready, false otherwise.
*/
public boolean isReady();
}

View File

@ -0,0 +1,24 @@
package org.onlab.netty;
/**
* Interface for encoding/decoding message payloads.
*/
public interface Serializer {
/**
* Decodes the specified byte array to a POJO.
*
* @param data byte array.
* @return POJO
*/
Object decode(byte[] data);
/**
* Encodes the specified POJO into a byte array.
*
* @param data POJO to be encoded
* @return byte array.
*/
byte[] encode(Object message);
}

View File

@ -0,0 +1,24 @@
package org.onlab.netty;
import java.util.concurrent.TimeUnit;
public final class SimpleClient {
private SimpleClient() {}
public static void main(String... args) throws Exception {
NettyMessagingService messaging = new TestNettyMessagingService(9081);
messaging.activate();
messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World");
Response<String> response = messaging.sendAndReceive(new Endpoint("localhost", 8080), "echo", "Hello World");
System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS));
}
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
Serializer serializer = new KryoSerializer();
this.serializer = serializer;
}
}
}

View File

@ -0,0 +1,19 @@
package org.onlab.netty;
public final class SimpleServer {
private SimpleServer() {}
public static void main(String... args) throws Exception {
NettyMessagingService server = new TestNettyMessagingService();
server.activate();
server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler());
}
public static class TestNettyMessagingService extends NettyMessagingService {
protected TestNettyMessagingService() {
Serializer serializer = new KryoSerializer();
this.serializer = serializer;
}
}
}

View File

@ -0,0 +1,4 @@
/**
* Asynchronous messaging APIs implemented using the Netty framework.
*/
package org.onlab.netty;