diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/DefaultEventSubscriber.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/DefaultEventSubscriber.java
new file mode 100644
index 0000000000..bf1fbf8398
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/DefaultEventSubscriber.java
@@ -0,0 +1,136 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.api.dto;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+/**
+ * Representation of a subscription to an event type.
+ *
+ */
+public final class DefaultEventSubscriber implements EventSubscriber {
+ private final String appName;
+ private final EventSubscriberGroupId subscriberGroupId;
+ private final Type eventType;
+
+ /**
+ * Creates a new Event Subscriber.
+ *
+ * @param name Application Name
+ * @param groupId Subscriber group id of the application
+ * @param eventType ONOS event type
+ */
+ public DefaultEventSubscriber(String name, EventSubscriberGroupId groupId,
+ Type eventType) {
+ this.appName = checkNotNull(name);
+ this.subscriberGroupId = checkNotNull(groupId);
+ this.eventType = checkNotNull(eventType);
+ }
+
+ @Override
+ public String appName() {
+ return appName;
+ }
+
+ @Override
+ public EventSubscriberGroupId subscriberGroupId() {
+ return subscriberGroupId;
+ }
+
+ @Override
+ public Type eventType() {
+ return eventType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(appName, subscriberGroupId, eventType);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof DefaultEventSubscriber) {
+ DefaultEventSubscriber sub = (DefaultEventSubscriber) o;
+ if (sub.appName.equals(appName)
+ && sub.subscriberGroupId.equals(subscriberGroupId)
+ && sub.eventType.equals(eventType)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("appName", appName)
+ .addValue(subscriberGroupId.toString())
+ .add("eventType", eventType).toString();
+ }
+ /**
+ * To create an instance of the builder.
+ *
+ * @return instance of builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+ /**
+ * Builder class for Event subscriber.
+ */
+ public static final class Builder implements EventSubscriber.Builder {
+ private String appName;
+ private EventSubscriberGroupId subscriberGroupId;
+ private Type eventType;
+
+ @Override
+ public Builder setAppName(String appName) {
+ this.appName = appName;
+ return this;
+ }
+
+ @Override
+ public Builder setSubscriberGroupId(EventSubscriberGroupId
+ subscriberGroupId) {
+ this.subscriberGroupId = subscriberGroupId;
+ return this;
+ }
+
+ @Override
+ public Builder setEventType(Type eventType) {
+ this.eventType = eventType;
+ return this;
+ }
+
+ @Override
+ public EventSubscriber build() {
+ checkNotNull(appName, "App name cannot be null");
+ checkNotNull(subscriberGroupId, "Subscriber group ID cannot " +
+ "be " +
+ "null");
+ checkNotNull(eventType, "Event type cannot be null");
+
+ return new DefaultEventSubscriber(appName,
+ subscriberGroupId,
+ eventType);
+ }
+ }
+
+}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java
index 95647e7d63..3de8199997 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java
@@ -13,87 +13,42 @@
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api.dto;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Objects;
-
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
/**
- * Representation of a subscription to an event type.
- *
+ * Abstraction of subscription to an event type.
*/
-public final class EventSubscriber {
- private final String appName;
- private final EventSubscriberGroupId subscriberGroupId;
- private final Type eventType;
+public interface EventSubscriber {
+ /**
+ * Returns the application name.
+ *
+ * @return application name.
+ */
+ String appName();
/**
- * Creates a new Event Subscriber.
- *
- * @param name Application Name
- * @param groupId Subscriber group id of the application
- * @param eventType ONOS event type
+ * Returns the subscriber group ID.
+ * @return subscriber group ID.
*/
- public EventSubscriber(String name, EventSubscriberGroupId groupId,
- Type eventType) {
- this.appName = checkNotNull(name);
- this.subscriberGroupId = checkNotNull(groupId);
- this.eventType = checkNotNull(eventType);
- }
-
- /**
- * Returns the Application Name.
- *
- * @return application name
- */
- public String appName() {
- return appName;
- }
-
- /**
- * Returns the Subscriber Group Id.
- *
- * @return Subscriber Group Id
- */
- public EventSubscriberGroupId subscriberGroupId() {
- return subscriberGroupId;
- }
+ EventSubscriberGroupId subscriberGroupId();
/**
* Returns the Event type.
*
* @return ONOS Event Type
*/
- public Type eventType() {
- return eventType;
- }
+ Type eventType();
- @Override
- public int hashCode() {
- return Objects.hash(appName, subscriberGroupId, eventType);
- }
+ /**
+ * An event subscriber builder.
+ */
+ interface Builder {
+ Builder setAppName(String appName);
- @Override
- public boolean equals(Object o) {
- if (o instanceof EventSubscriber) {
- EventSubscriber sub = (EventSubscriber) o;
- if (sub.appName.equals(appName)
- && sub.subscriberGroupId.equals(subscriberGroupId)
- && sub.eventType.equals(eventType)) {
- return true;
- }
- }
+ Builder setSubscriberGroupId(EventSubscriberGroupId subscriberGroupId);
- return false;
- }
+ Builder setEventType(Type eventType);
- @Override
- public String toString() {
- return toStringHelper(this).add("appName", appName)
- .addValue(subscriberGroupId.toString())
- .add("eventType", eventType).toString();
+ EventSubscriber build();
}
}
diff --git a/apps/kafka-integration/app/pom.xml b/apps/kafka-integration/app/pom.xml
index 4e1d5358ed..20b9427d5e 100644
--- a/apps/kafka-integration/app/pom.xml
+++ b/apps/kafka-integration/app/pom.xml
@@ -166,6 +166,7 @@
com.fasterxml.jackson.core,
org.onlab.packet.*,
org.onosproject.*,
+ org.onlab.util.*,
com.google.common.*
${web.context}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
index 6e1a0e84d0..e2e8e57c6c 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
@@ -19,6 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -33,8 +36,8 @@ import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.kafkaintegration.api.EventExporterService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
@@ -98,7 +101,12 @@ public class EventExporterManager implements EventExporterService {
.>consistentMapBuilder()
.withName(SUBSCRIBED_APPS)
.withSerializer(Serializer.using(KryoNamespaces.API,
- EventSubscriber.class))
+ EventSubscriber.class,
+ OnosEvent.class,
+ OnosEvent.Type.class,
+ DefaultEventSubscriber.class,
+ EventSubscriberGroupId.class,
+ UUID.class))
.build().asJavaMap();
log.info("Started");
@@ -119,7 +127,6 @@ public class EventExporterManager implements EventExporterService {
return registeredApps.computeIfAbsent(externalAppId,
(key) -> new EventSubscriberGroupId(UUID
.randomUUID()));
-
}
@Override
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
new file mode 100644
index 0000000000..47098fa2da
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.codec.CodecService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.rest.SubscriberCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the JSON codec brokering service for Kafka app.
+ */
+@Component(immediate = true)
+public class KafkaCodecRegistrator {
+ private static Logger log = LoggerFactory.getLogger(KafkaCodecRegistrator
+ .class);
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CodecService codecService;
+
+ @Activate
+ public void activate() {
+ codecService.registerCodec(EventSubscriber.class, new SubscriberCodec());
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
index 520976af42..0456362b5d 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
@@ -28,7 +28,6 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.onosproject.codec.JsonCodec;
import org.onosproject.kafkaintegration.api.EventExporterService;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
@@ -54,9 +53,10 @@ public class EventExporterWebResource extends AbstractWebResource {
"De-Registered Listener successfully";
public static final String EVENT_SUBSCRIPTION_SUCCESSFUL =
"Event Registration successfull";
+ public static final String EVENT_SUBSCRIPTION_UNSUCCESSFUL =
+ "Event subscription unsuccessful";
public static final String EVENT_SUBSCRIPTION_REMOVED =
"Event De-Registration successfull";
-
/**
* Registers a listener for ONOS Events.
*
@@ -95,7 +95,7 @@ public class EventExporterWebResource extends AbstractWebResource {
EventExporterService service = get(EventExporterService.class);
service.unregisterListener(appName);
-
+ log.info("Unregistered app {}", appName);
return ok(DEREGISTRATION_SUCCESSFUL).build();
}
@@ -107,6 +107,7 @@ public class EventExporterWebResource extends AbstractWebResource {
* @onos.rsModel KafkaSubscription
*/
@POST
+ @Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("subscribe")
public Response subscribe(InputStream input) {
@@ -136,11 +137,10 @@ public class EventExporterWebResource extends AbstractWebResource {
ObjectMapper mapper = new ObjectMapper();
ObjectNode node = (ObjectNode) mapper.readTree(input);
-
checkNotNull(node, JSON_NOT_NULL);
-
- JsonCodec codec = codec(EventSubscriber.class);
- return codec.decode(node, this);
+ EventSubscriber codec = codec(EventSubscriber.class).decode(node, this);
+ checkNotNull(codec, JSON_NOT_NULL);
+ return codec;
}
/**
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
index f7a02b42eb..2876ca1826 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
@@ -20,10 +20,10 @@ import java.util.UUID;
import org.onosproject.codec.CodecContext;
import org.onosproject.codec.JsonCodec;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
@@ -47,13 +47,18 @@ public final class SubscriberCodec extends JsonCodec {
@Override
public EventSubscriber decode(ObjectNode json, CodecContext context) {
- String name = json.path(NAME).asText();
- String groupId = json.path(GROUP_ID).asText();
- EventSubscriberGroupId subscriberGroupId = new EventSubscriberGroupId(UUID
- .fromString(groupId));
- String eventType = json.path(EVENT_TYPE).asText();
- return new EventSubscriber(name, subscriberGroupId,
- Type.valueOf(eventType));
+ EventSubscriber.Builder resultBuilder = new DefaultEventSubscriber
+ .Builder();
+ String appName = json.get(NAME).asText();
+ resultBuilder.setAppName(appName);
+
+ String subscriberGroupId = json.get(GROUP_ID).asText();
+ resultBuilder.setSubscriberGroupId(new EventSubscriberGroupId(UUID.
+ fromString(subscriberGroupId)));
+
+ String eventType = json.get(EVENT_TYPE).asText();
+ resultBuilder.setEventType(Type.valueOf(eventType));
+ return resultBuilder.build();
}
-}
+}
\ No newline at end of file