From ee83f26c35d2874a58f52f183d90b2f3a27367b6 Mon Sep 17 00:00:00 2001 From: Aaron Dunlap Date: Mon, 14 Jan 2019 12:32:38 -0600 Subject: [PATCH] Added code to allow HOST events to be sent via Kafka Change-Id: I66e9df2fcb6cce75216aa1b3b2969cd12aec092d --- .../kafkaintegration/api/dto/OnosEvent.java | 7 +- .../converter/HostEventConverter.java | 114 ++++++++++++++++++ .../impl/EventConversionManager.java | 9 +- .../kafkaintegration/kafka/EventListener.java | 33 +++++ 4 files changed, 161 insertions(+), 2 deletions(-) create mode 100644 apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/HostEventConverter.java diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java index 9cdf65f278..6bd897e8aa 100644 --- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java +++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java @@ -46,6 +46,11 @@ public class OnosEvent extends AbstractEvent { /** * Signifies Link events. */ - LINK + LINK, + + /** + * Signifies Host events. + */ + HOST } } diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/HostEventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/HostEventConverter.java new file mode 100644 index 0000000000..54d802b952 --- /dev/null +++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/HostEventConverter.java @@ -0,0 +1,114 @@ +/* + * Copyright 2016-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.kafkaintegration.converter; + +import com.google.protobuf.GeneratedMessageV3; + +import org.onlab.packet.IpAddress; +import org.onosproject.event.Event; +import org.onosproject.grpc.net.host.models.HostEnumsProto.HostEventTypeProto; +import org.onosproject.grpc.net.host.models.HostEventProto.HostNotificationProto; +import org.onosproject.grpc.net.models.HostProtoOuterClass.HostProto; +import org.onosproject.incubator.protobuf.models.net.AnnotationsTranslator; +import org.onosproject.incubator.protobuf.models.net.HostIdProtoTranslator; +import org.onosproject.incubator.protobuf.models.net.HostLocationProtoTranslator; +import org.onosproject.net.host.HostEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.stream.Collectors; + +/** + * Converts ONOS Host event message to protobuf format. + */ +public class HostEventConverter implements EventConverter { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Override + public byte[] convertToProtoMessage(Event event) { + + HostEvent hostEvent = (HostEvent) event; + + if (!hostEventTypeSupported(hostEvent)) { + log.error("Unsupported Onos Host Event {}. There is no matching" + + "proto Host Event type", hostEvent.type().toString()); + return null; + } + + return ((GeneratedMessageV3) buildHostProtoMessage(hostEvent)).toByteArray(); + + } + + /** + * Checks if the ONOS Host Event type is supported. + * + * @param event ONOS Host event + * @return true if there is a match and false otherwise + */ + private boolean hostEventTypeSupported(HostEvent event) { + HostEventTypeProto[] hostEvents = HostEventTypeProto.values(); + for (HostEventTypeProto hostEventType : hostEvents) { + if (hostEventType.name().equals(event.type().name())) { + return true; + } + } + + return false; + } + + private HostNotificationProto buildHostProtoMessage(HostEvent hostEvent) { + HostNotificationProto.Builder notificationBuilder = + HostNotificationProto.newBuilder(); + + HostProto hostCore = + HostProto.newBuilder() + .setHostId(HostIdProtoTranslator.translate(hostEvent + .subject().id())) + .setConfigured(hostEvent.subject().configured()) + .addAllIpAddresses(hostEvent.subject().ipAddresses() + .stream().map(IpAddress::toString) + .collect(Collectors.toList())) + .setLocation(HostLocationProtoTranslator.translate( + hostEvent.subject().location())) + .setVlan(hostEvent.subject().vlan().toShort()) + .putAllAnnotations(AnnotationsTranslator.asMap( + hostEvent.subject().annotations())) + .build(); + + notificationBuilder.setHostEventType(getProtoType(hostEvent)) + .setHost(hostCore); + + return notificationBuilder.build(); + } + + /** + * Retrieves the protobuf generated host event type. + * + * @param event ONOS Host Event + * @return generated Host Event Type + */ + private HostEventTypeProto getProtoType(HostEvent event) { + HostEventTypeProto protobufEventType = null; + HostEventTypeProto[] hostEvents = HostEventTypeProto.values(); + for (HostEventTypeProto hostEventType : hostEvents) { + if (hostEventType.name().equals(event.type().name())) { + protobufEventType = hostEventType; + } + } + + return protobufEventType; + } +} diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java index 5c90325941..372b57a181 100644 --- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java +++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java @@ -22,16 +22,19 @@ import org.osgi.service.component.annotations.Deactivate; import org.onosproject.event.Event; import org.onosproject.kafkaintegration.api.EventConversionService; import org.onosproject.kafkaintegration.api.dto.OnosEvent; -import org.onosproject.kafkaintegration.converter.DeviceEventConverter; import org.onosproject.kafkaintegration.converter.EventConverter; +import org.onosproject.kafkaintegration.converter.DeviceEventConverter; import org.onosproject.kafkaintegration.converter.LinkEventConverter; +import org.onosproject.kafkaintegration.converter.HostEventConverter; import org.onosproject.net.device.DeviceEvent; import org.onosproject.net.link.LinkEvent; +import org.onosproject.net.host.HostEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE; import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK; +import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.HOST; /** * Implementation of Event Conversion Service. @@ -43,11 +46,13 @@ public class EventConversionManager implements EventConversionService { private final Logger log = LoggerFactory.getLogger(getClass()); private EventConverter deviceEventConverter; private EventConverter linkEventConverter; + private EventConverter hostEventConverter; @Activate protected void activate() { deviceEventConverter = new DeviceEventConverter(); linkEventConverter = new LinkEventConverter(); + hostEventConverter = new HostEventConverter(); log.info("Started"); } @@ -63,6 +68,8 @@ public class EventConversionManager implements EventConversionService { return new OnosEvent(DEVICE, deviceEventConverter.convertToProtoMessage(event)); } else if (event instanceof LinkEvent) { return new OnosEvent(LINK, linkEventConverter.convertToProtoMessage(event)); + } else if (event instanceof HostEvent) { + return new OnosEvent(HOST, hostEventConverter.convertToProtoMessage(event)); } else { throw new IllegalArgumentException("Unsupported event type"); } diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java index 18baccbfd3..91e2028891 100644 --- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java +++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java @@ -34,6 +34,9 @@ import org.onosproject.net.device.DeviceService; import org.onosproject.net.link.LinkEvent; import org.onosproject.net.link.LinkListener; import org.onosproject.net.link.LinkService; +import org.onosproject.net.host.HostEvent; +import org.onosproject.net.host.HostListener; +import org.onosproject.net.host.HostService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +47,7 @@ import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static org.onlab.util.Tools.groupedThreads; import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE; import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK; +import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.HOST; /** @@ -65,6 +69,9 @@ public class EventListener { @Reference(cardinality = ReferenceCardinality.MANDATORY) protected LinkService linkService; + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected HostService hostService; + @Reference(cardinality = ReferenceCardinality.MANDATORY) protected KafkaEventStorageService kafkaStoreService; @@ -76,6 +83,7 @@ public class EventListener { private final DeviceListener deviceListener = new InternalDeviceListener(); private final LinkListener linkListener = new InternalLinkListener(); + private final HostListener hostListener = new InternalHostListener(); protected ExecutorService eventExecutor; @@ -89,6 +97,7 @@ public class EventListener { eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/onosEvents", "events-%d", log)); deviceService.addListener(deviceListener); linkService.addListener(linkListener); + hostService.addListener(hostListener); localNodeId = clusterService.getLocalNode().id(); @@ -101,6 +110,7 @@ public class EventListener { protected void deactivate() { deviceService.removeListener(deviceListener); linkService.removeListener(linkListener); + hostService.removeListener(hostListener); eventExecutor.shutdownNow(); eventExecutor = null; @@ -153,4 +163,27 @@ public class EventListener { } } + + private class InternalHostListener implements HostListener { + + @Override + public void event(HostEvent event) { + + // do not allow to proceed without leadership + NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC); + if (!Objects.equals(localNodeId, leaderNodeId)) { + log.debug("Not a Leader, cannot publish!"); + return; + } + + if (!eventSubscriptionService.getEventSubscribers(HOST).isEmpty()) { + OnosEvent onosEvent = eventConversionService.convertEvent(event); + eventExecutor.execute(() -> { + kafkaStoreService.publishEvent(onosEvent); + }); + log.debug("Pushed event {} to kafka storage", onosEvent); + } + + } + } } \ No newline at end of file