Added code to allow HOST events to be sent via Kafka

Change-Id: I66e9df2fcb6cce75216aa1b3b2969cd12aec092d
This commit is contained in:
Aaron Dunlap 2019-01-14 12:32:38 -06:00
parent 5d65d11a75
commit ee83f26c35
4 changed files with 161 additions and 2 deletions

View File

@ -46,6 +46,11 @@ public class OnosEvent extends AbstractEvent<OnosEvent.Type, byte[]> {
/**
* Signifies Link events.
*/
LINK
LINK,
/**
* Signifies Host events.
*/
HOST
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright 2016-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.converter;
import com.google.protobuf.GeneratedMessageV3;
import org.onlab.packet.IpAddress;
import org.onosproject.event.Event;
import org.onosproject.grpc.net.host.models.HostEnumsProto.HostEventTypeProto;
import org.onosproject.grpc.net.host.models.HostEventProto.HostNotificationProto;
import org.onosproject.grpc.net.models.HostProtoOuterClass.HostProto;
import org.onosproject.incubator.protobuf.models.net.AnnotationsTranslator;
import org.onosproject.incubator.protobuf.models.net.HostIdProtoTranslator;
import org.onosproject.incubator.protobuf.models.net.HostLocationProtoTranslator;
import org.onosproject.net.host.HostEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.stream.Collectors;
/**
* Converts ONOS Host event message to protobuf format.
*/
public class HostEventConverter implements EventConverter {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public byte[] convertToProtoMessage(Event<?, ?> event) {
HostEvent hostEvent = (HostEvent) event;
if (!hostEventTypeSupported(hostEvent)) {
log.error("Unsupported Onos Host Event {}. There is no matching"
+ "proto Host Event type", hostEvent.type().toString());
return null;
}
return ((GeneratedMessageV3) buildHostProtoMessage(hostEvent)).toByteArray();
}
/**
* Checks if the ONOS Host Event type is supported.
*
* @param event ONOS Host event
* @return true if there is a match and false otherwise
*/
private boolean hostEventTypeSupported(HostEvent event) {
HostEventTypeProto[] hostEvents = HostEventTypeProto.values();
for (HostEventTypeProto hostEventType : hostEvents) {
if (hostEventType.name().equals(event.type().name())) {
return true;
}
}
return false;
}
private HostNotificationProto buildHostProtoMessage(HostEvent hostEvent) {
HostNotificationProto.Builder notificationBuilder =
HostNotificationProto.newBuilder();
HostProto hostCore =
HostProto.newBuilder()
.setHostId(HostIdProtoTranslator.translate(hostEvent
.subject().id()))
.setConfigured(hostEvent.subject().configured())
.addAllIpAddresses(hostEvent.subject().ipAddresses()
.stream().map(IpAddress::toString)
.collect(Collectors.toList()))
.setLocation(HostLocationProtoTranslator.translate(
hostEvent.subject().location()))
.setVlan(hostEvent.subject().vlan().toShort())
.putAllAnnotations(AnnotationsTranslator.asMap(
hostEvent.subject().annotations()))
.build();
notificationBuilder.setHostEventType(getProtoType(hostEvent))
.setHost(hostCore);
return notificationBuilder.build();
}
/**
* Retrieves the protobuf generated host event type.
*
* @param event ONOS Host Event
* @return generated Host Event Type
*/
private HostEventTypeProto getProtoType(HostEvent event) {
HostEventTypeProto protobufEventType = null;
HostEventTypeProto[] hostEvents = HostEventTypeProto.values();
for (HostEventTypeProto hostEventType : hostEvents) {
if (hostEventType.name().equals(event.type().name())) {
protobufEventType = hostEventType;
}
}
return protobufEventType;
}
}

View File

@ -22,16 +22,19 @@ import org.osgi.service.component.annotations.Deactivate;
import org.onosproject.event.Event;
import org.onosproject.kafkaintegration.api.EventConversionService;
import org.onosproject.kafkaintegration.api.dto.OnosEvent;
import org.onosproject.kafkaintegration.converter.DeviceEventConverter;
import org.onosproject.kafkaintegration.converter.EventConverter;
import org.onosproject.kafkaintegration.converter.DeviceEventConverter;
import org.onosproject.kafkaintegration.converter.LinkEventConverter;
import org.onosproject.kafkaintegration.converter.HostEventConverter;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.host.HostEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.HOST;
/**
* Implementation of Event Conversion Service.
@ -43,11 +46,13 @@ public class EventConversionManager implements EventConversionService {
private final Logger log = LoggerFactory.getLogger(getClass());
private EventConverter deviceEventConverter;
private EventConverter linkEventConverter;
private EventConverter hostEventConverter;
@Activate
protected void activate() {
deviceEventConverter = new DeviceEventConverter();
linkEventConverter = new LinkEventConverter();
hostEventConverter = new HostEventConverter();
log.info("Started");
}
@ -63,6 +68,8 @@ public class EventConversionManager implements EventConversionService {
return new OnosEvent(DEVICE, deviceEventConverter.convertToProtoMessage(event));
} else if (event instanceof LinkEvent) {
return new OnosEvent(LINK, linkEventConverter.convertToProtoMessage(event));
} else if (event instanceof HostEvent) {
return new OnosEvent(HOST, hostEventConverter.convertToProtoMessage(event));
} else {
throw new IllegalArgumentException("Unsupported event type");
}

View File

@ -34,6 +34,9 @@ import org.onosproject.net.device.DeviceService;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.link.LinkListener;
import org.onosproject.net.link.LinkService;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,6 +47,7 @@ import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.HOST;
/**
@ -65,6 +69,9 @@ public class EventListener {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected LinkService linkService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected KafkaEventStorageService kafkaStoreService;
@ -76,6 +83,7 @@ public class EventListener {
private final DeviceListener deviceListener = new InternalDeviceListener();
private final LinkListener linkListener = new InternalLinkListener();
private final HostListener hostListener = new InternalHostListener();
protected ExecutorService eventExecutor;
@ -89,6 +97,7 @@ public class EventListener {
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/onosEvents", "events-%d", log));
deviceService.addListener(deviceListener);
linkService.addListener(linkListener);
hostService.addListener(hostListener);
localNodeId = clusterService.getLocalNode().id();
@ -101,6 +110,7 @@ public class EventListener {
protected void deactivate() {
deviceService.removeListener(deviceListener);
linkService.removeListener(linkListener);
hostService.removeListener(hostListener);
eventExecutor.shutdownNow();
eventExecutor = null;
@ -153,4 +163,27 @@ public class EventListener {
}
}
private class InternalHostListener implements HostListener {
@Override
public void event(HostEvent event) {
// do not allow to proceed without leadership
NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
if (!Objects.equals(localNodeId, leaderNodeId)) {
log.debug("Not a Leader, cannot publish!");
return;
}
if (!eventSubscriptionService.getEventSubscribers(HOST).isEmpty()) {
OnosEvent onosEvent = eventConversionService.convertEvent(event);
eventExecutor.execute(() -> {
kafkaStoreService.publishEvent(onosEvent);
});
log.debug("Pushed event {} to kafka storage", onosEvent);
}
}
}
}