diff --git a/drivers/server/src/test/java/org/onosproject/drivers/server/RestSBControllerMock.java b/drivers/server/src/test/java/org/onosproject/drivers/server/RestSBControllerMock.java index 915c1bb38c..265432bdc6 100644 --- a/drivers/server/src/test/java/org/onosproject/drivers/server/RestSBControllerMock.java +++ b/drivers/server/src/test/java/org/onosproject/drivers/server/RestSBControllerMock.java @@ -33,8 +33,10 @@ import java.util.List; import java.util.Set; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.sse.InboundSseEvent; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertThat; @@ -168,8 +170,24 @@ public class RestSBControllerMock implements RestSBController { } @Override - public T post(DeviceId device, String request, InputStream payload, - MediaType mediaType, Class responseClass) { + public T post(DeviceId device, String request, InputStream payload, + MediaType mediaType, Class responseClass) { return null; - } + } + + @Override + public void startServerSentEvents(DeviceId deviceId, String eventsUrl) { + return; + } + + @Override + public int getServerSentEvents(DeviceId deviceId, String request, + Consumer onEvent, Consumer onError) { + return 204; + } + + @Override + public int cancelServerSentEvents(DeviceId deviceId) { + return 200; + } } \ No newline at end of file diff --git a/lib/BUCK b/lib/BUCK index d93c8de5e3..a430c66712 100644 --- a/lib/BUCK +++ b/lib/BUCK @@ -780,6 +780,15 @@ remote_jar ( visibility = [ 'PUBLIC' ], ) +remote_jar ( + name = 'jersey-media-sse', + out = 'jersey-media-sse-2.26.jar', + url = 'mvn:org.glassfish.jersey.media:jersey-media-sse:jar:2.26', + sha1 = '61cfeb0df911585140f64f3369eb524e0e02f534', + maven_coords = 'org.glassfish.jersey.media:jersey-media-sse:2.26', + visibility = [ 'PUBLIC' ], +) + remote_jar ( name = 'jersey-server', out = 'jersey-server-2.26.jar', diff --git a/lib/deps.json b/lib/deps.json index 81ca104bf1..aaaa82d217 100644 --- a/lib/deps.json +++ b/lib/deps.json @@ -202,6 +202,7 @@ "jersey-container-servlet": "mvn:org.glassfish.jersey.containers:jersey-container-servlet:2.26", "jersey-container-servlet-core": "mvn:org.glassfish.jersey.containers:jersey-container-servlet-core:2.26", "jersey-media-multipart": "mvn:org.glassfish.jersey.media:jersey-media-multipart:2.26", + "jersey-media-sse": "mvn:org.glassfish.jersey.media:jersey-media-sse:2.26", "jersey-server": "mvn:org.glassfish.jersey.core:jersey-server:2.26", "jersey-hk2":"mvn:org.glassfish.jersey.inject:jersey-hk2:2.26", "jersey-test-framework-core": "mvn:org.glassfish.jersey.test-framework:jersey-test-framework-core:2.26", diff --git a/protocols/rest/api/src/main/java/org/onosproject/protocol/http/HttpSBController.java b/protocols/rest/api/src/main/java/org/onosproject/protocol/http/HttpSBController.java index ada5496ba8..0f5b32b574 100644 --- a/protocols/rest/api/src/main/java/org/onosproject/protocol/http/HttpSBController.java +++ b/protocols/rest/api/src/main/java/org/onosproject/protocol/http/HttpSBController.java @@ -18,8 +18,10 @@ package org.onosproject.protocol.http; import java.io.InputStream; import java.util.Map; +import java.util.function.Consumer; import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.InboundSseEvent; import org.onlab.packet.IpAddress; import org.onosproject.net.DeviceId; @@ -125,7 +127,8 @@ public interface HttpSBController { InputStream get(DeviceId device, String request, MediaType mediaType); /** - * Does a HTTP POST request with specified parameters to the device. + * Does a HTTP POST request with specified parameters to the device and + * extracts an object of type T from the response entity field. * * @param post return type * @param device device to make the request to @@ -138,5 +141,27 @@ public interface HttpSBController { */ T post(DeviceId device, String request, InputStream payload, MediaType mediaType, Class responseClass); + /** + * Does a HTTP GET against a Server Sent Events (SSE_INBOUND) resource on the device. + * + * This is a low level function that can take callbacks. + * For a higher level function that emits events based on this callback + * see startServerSentEvents() in the RestSBController + * + * @param deviceId device to make the request to + * @param request url of the request + * @param onEvent A consumer of inbound SSE_INBOUND events + * @param onError A consumer of inbound SSE_INBOUND errors + * @return status Commonly used status codes defined by HTTP + */ + int getServerSentEvents(DeviceId deviceId, String request, + Consumer onEvent, Consumer onError); + /** + * Cancels a Server Sent Events listener to a device. + * + * @param deviceId device to cancel the listener for + * @return status Commonly used status codes defined by HTTP + */ + int cancelServerSentEvents(DeviceId deviceId); } diff --git a/protocols/rest/api/src/main/java/org/onosproject/protocol/http/ctl/HttpSBControllerImpl.java b/protocols/rest/api/src/main/java/org/onosproject/protocol/http/ctl/HttpSBControllerImpl.java index e8fcca0c49..e4ba243445 100644 --- a/protocols/rest/api/src/main/java/org/onosproject/protocol/http/ctl/HttpSBControllerImpl.java +++ b/protocols/rest/api/src/main/java/org/onosproject/protocol/http/ctl/HttpSBControllerImpl.java @@ -45,6 +45,8 @@ import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import javax.ws.rs.sse.InboundSseEvent; +import javax.ws.rs.sse.SseEventSource; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -57,6 +59,7 @@ import java.security.cert.X509Certificate; import java.util.Base64; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import static com.google.common.base.Preconditions.checkNotNull; @@ -80,6 +83,7 @@ public class HttpSBControllerImpl implements HttpSBController { private final Map deviceMap = new ConcurrentHashMap<>(); private final Map clientMap = new ConcurrentHashMap<>(); + private final Map sseEventSourceMap = new ConcurrentHashMap<>(); public Map getDeviceMap() { return deviceMap; @@ -89,6 +93,10 @@ public class HttpSBControllerImpl implements HttpSBController { return clientMap; } + public Map getSseEventSourceMap() { + return sseEventSourceMap; + } + @Override public Map getDevices() { return ImmutableMap.copyOf(deviceMap); @@ -121,6 +129,7 @@ public class HttpSBControllerImpl implements HttpSBController { public void removeDevice(DeviceId deviceId) { clientMap.remove(deviceId); deviceMap.remove(deviceId); + sseEventSourceMap.remove(deviceId); } @Override @@ -251,6 +260,58 @@ public class HttpSBControllerImpl implements HttpSBController { return response.getStatus(); } + @Override + public int getServerSentEvents(DeviceId deviceId, String request, + Consumer onEvent, + Consumer onError) { + if (deviceId == null) { + log.warn("Device ID is null", request); + return Status.PRECONDITION_FAILED.getStatusCode(); + } + + if (request == null || request.isEmpty()) { + log.warn("Request cannot be empty", request); + return Status.PRECONDITION_FAILED.getStatusCode(); + } + + if (sseEventSourceMap.containsKey(deviceId)) { + log.warn("Device", deviceId, "is already listening to an SSE stream"); + return Status.CONFLICT.getStatusCode(); + } + + WebTarget wt = getWebTarget(deviceId, request); + SseEventSource sseEventSource = SseEventSource.target(wt).build(); + sseEventSource.register(onEvent, onError); + sseEventSource.open(); + if (sseEventSource.isOpen()) { + sseEventSourceMap.put(deviceId, sseEventSource); + log.info("Opened Server Sent Events request to ", request, "on", deviceId); + while (sseEventSource.isOpen()) { + try { + Thread.sleep(1010); + System.out.println("Listening for SSEs"); + } catch (InterruptedException e) { + log.error("Error", e); + } + } + return Status.NO_CONTENT.getStatusCode(); + } else { + log.error("Unable to open Server Sent Events request to ", request, "to", deviceId); + return Status.INTERNAL_SERVER_ERROR.getStatusCode(); + } + } + + @Override + public int cancelServerSentEvents(DeviceId deviceId) { + if (sseEventSourceMap.containsKey(deviceId)) { + sseEventSourceMap.get(deviceId).close(); + sseEventSourceMap.remove(deviceId); + return Status.OK.getStatusCode(); + } else { + return Status.NOT_FOUND.getStatusCode(); + } + } + private MediaType typeOfMediaType(String type) { switch (type) { case XML: diff --git a/protocols/rest/api/src/main/java/org/onosproject/protocol/rest/RestSBController.java b/protocols/rest/api/src/main/java/org/onosproject/protocol/rest/RestSBController.java index 551ddcb66b..c67609a66a 100644 --- a/protocols/rest/api/src/main/java/org/onosproject/protocol/rest/RestSBController.java +++ b/protocols/rest/api/src/main/java/org/onosproject/protocol/rest/RestSBController.java @@ -56,4 +56,22 @@ public interface RestSBController extends HttpSBController { * @return the corresponding REST proxied device */ RestSBDevice getProxySBDevice(DeviceId deviceId); + + /** + * Call on the Rest SB interface for a device to request ServerSentEvents from events URL. + * + * These events will be converted to ONOS events and forwarded to any registered listener + * through the EventDispatcher system. Drivers can implement listeners in their + * own particular way depending on the type of data expected. + * + * To register and unregister listeners use the addListener and removeListener + * methods. These listeners will get messages from all devices. + * + * To stop a particular device's event stream use the cancelServerSentEvents + * for that device. + * + * @param deviceId the id of the device exposed to ONOS + * @param eventsUrl The resource on the device that supplies an SSE_INBOUND stream + */ + void startServerSentEvents(DeviceId deviceId, String eventsUrl); } diff --git a/protocols/rest/api/src/main/java/org/onosproject/protocol/rest/RestSBEventListener.java b/protocols/rest/api/src/main/java/org/onosproject/protocol/rest/RestSBEventListener.java new file mode 100644 index 0000000000..d96599c943 --- /dev/null +++ b/protocols/rest/api/src/main/java/org/onosproject/protocol/rest/RestSBEventListener.java @@ -0,0 +1,25 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.protocol.rest; + +import org.onosproject.event.EventListener; + +/** + * Listener interface for Server Sent Events from Rest SB. + */ +public interface RestSBEventListener extends EventListener { +} diff --git a/protocols/rest/api/src/main/java/org/onosproject/protocol/rest/RestSBServerSentEvent.java b/protocols/rest/api/src/main/java/org/onosproject/protocol/rest/RestSBServerSentEvent.java new file mode 100644 index 0000000000..30d35ee637 --- /dev/null +++ b/protocols/rest/api/src/main/java/org/onosproject/protocol/rest/RestSBServerSentEvent.java @@ -0,0 +1,94 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.protocol.rest; + +import org.onosproject.event.AbstractEvent; +import org.onosproject.net.DeviceId; + +import javax.ws.rs.sse.InboundSseEvent; + +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Event received on the REST SB interface as ServerSentEvent (SSE_INBOUND). + */ +public class RestSBServerSentEvent extends AbstractEvent { + + private String id; + private String comment; + private String data; + private String name; + + /** + * SSE Event types supported. + */ + public enum Type { + SSE_INBOUND + } + + public RestSBServerSentEvent(Type type, DeviceId deviceId, InboundSseEvent sseEvent) { + super(type, deviceId); + checkNotNull(sseEvent); + data = sseEvent.readData(); + id = sseEvent.getId(); + name = sseEvent.getName(); + comment = sseEvent.getComment(); + } + + public String getData() { + return data; + } + + public String getId() { + return id; + } + + public String getComment() { + return comment; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return super.toString() + ", id=" + id + ", name=" + name + ", comment=" + comment; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RestSBServerSentEvent that = (RestSBServerSentEvent) o; + return Objects.equals(id, that.id) && + Objects.equals(comment, that.comment) && + Objects.equals(data, that.data) && + Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(id, comment, data, name); + } +} diff --git a/protocols/rest/ctl/BUCK b/protocols/rest/ctl/BUCK index 3fdfd57673..c12164dc0f 100644 --- a/protocols/rest/ctl/BUCK +++ b/protocols/rest/ctl/BUCK @@ -2,6 +2,7 @@ COMPILE_DEPS = [ '//lib:CORE_DEPS', '//lib:jersey-client', '//lib:jersey-common', + '//lib:jersey-media-sse', '//lib:httpclient-osgi', '//lib:httpcore-osgi', '//lib:javax.ws.rs-api', @@ -11,7 +12,13 @@ COMPILE_DEPS = [ '//protocols/rest/api:onos-protocols-rest-api', ] +TEST_DEPS = [ + '//lib:TEST_REST', + '//core/common:onos-core-common-tests' +] + osgi_jar_with_tests ( deps = COMPILE_DEPS, + test_deps = TEST_DEPS ) diff --git a/protocols/rest/ctl/BUILD b/protocols/rest/ctl/BUILD index f2b3d0c365..f7fd3eae45 100644 --- a/protocols/rest/ctl/BUILD +++ b/protocols/rest/ctl/BUILD @@ -2,6 +2,7 @@ COMPILE_DEPS = CORE_DEPS + [ "@jersey_client//jar", "@jersey_server//jar", "@jersey_common//jar", + "@jersey_media_sse//jar", "@jersey_security//jar", "@httpclient_osgi//jar", "@httpcore_osgi//jar", @@ -12,6 +13,11 @@ COMPILE_DEPS = CORE_DEPS + [ "//protocols/rest/api:onos-protocols-rest-api", ] +TEST_DEPS = TEST_REST + [ + "//core/common:onos-core-common-tests" +] + osgi_jar_with_tests( deps = COMPILE_DEPS, + test_deps = TEST_DEPS, ) diff --git a/protocols/rest/ctl/pom.xml b/protocols/rest/ctl/pom.xml index 5ff9e08479..e99d193268 100644 --- a/protocols/rest/ctl/pom.xml +++ b/protocols/rest/ctl/pom.xml @@ -39,6 +39,10 @@ ${project.version} bundle + + org.glassfish.jersey.core + jersey-media-sse + diff --git a/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java b/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java index 83737ca765..a527e0e624 100644 --- a/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java +++ b/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java @@ -20,15 +20,23 @@ import com.google.common.collect.ImmutableSet; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; +import org.onosproject.event.EventDeliveryService; +import org.onosproject.event.ListenerRegistry; +import org.onosproject.event.ListenerService; import org.onosproject.net.DeviceId; import org.onosproject.protocol.http.ctl.HttpSBControllerImpl; import org.onosproject.protocol.rest.RestSBController; import org.onosproject.protocol.rest.RestSBDevice; +import org.onosproject.protocol.rest.RestSBEventListener; +import org.onosproject.protocol.rest.RestSBServerSentEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.sse.InboundSseEvent; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -39,15 +47,23 @@ import java.util.stream.Collectors; */ @Component(immediate = true) @Service -public class RestSBControllerImpl extends HttpSBControllerImpl implements RestSBController { +public class RestSBControllerImpl extends HttpSBControllerImpl + implements RestSBController, ListenerService { private static final Logger log = LoggerFactory.getLogger(RestSBControllerImpl.class); + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected EventDeliveryService eventDispatcher; + + protected final ListenerRegistry listenerRegistry = + new ListenerRegistry<>(); + private final Map proxiedDeviceMap = new ConcurrentHashMap<>(); @Activate public void activate() { + eventDispatcher.addSink(RestSBServerSentEvent.class, listenerRegistry); log.info("Started"); } @@ -55,6 +71,7 @@ public class RestSBControllerImpl extends HttpSBControllerImpl implements RestSB public void deactivate() { this.getClientMap().clear(); this.getDeviceMap().clear(); + this.getSseEventSourceMap().clear(); log.info("Stopped"); } @@ -118,4 +135,36 @@ public class RestSBControllerImpl extends HttpSBControllerImpl implements RestSB } } + + @Override + public void startServerSentEvents(DeviceId deviceId, String eventsUrl) { + this.getServerSentEvents(deviceId, eventsUrl, + (event) -> sendEvent(event, deviceId), + (error) -> log.error("Unable to handle {} SSEvent from {}. {}", + eventsUrl, deviceId, error)); + } + + @Override + public void addListener(RestSBEventListener listener) { + listenerRegistry.addListener(listener); + } + + @Override + public void removeListener(RestSBEventListener listener) { + listenerRegistry.removeListener(listener); + } + + /** + * Safely posts the specified event to the local event dispatcher. + * If there is no event dispatcher or if the event is null, this method + * is a noop. + * @param sseEvent event to be posted; may be null + * @param deviceId the device that sent the event + */ + protected void sendEvent(InboundSseEvent sseEvent, DeviceId deviceId) { + if (sseEvent != null && eventDispatcher != null) { + eventDispatcher.post(new RestSBServerSentEvent( + RestSBServerSentEvent.Type.SSE_INBOUND, deviceId, sseEvent)); + } + } } diff --git a/protocols/rest/ctl/src/test/java/org/onosproject/protocol/rest/ctl/RestSBControllerImplTest.java b/protocols/rest/ctl/src/test/java/org/onosproject/protocol/rest/ctl/RestSBControllerImplTest.java index 8f90057404..fa9f952354 100644 --- a/protocols/rest/ctl/src/test/java/org/onosproject/protocol/rest/ctl/RestSBControllerImplTest.java +++ b/protocols/rest/ctl/src/test/java/org/onosproject/protocol/rest/ctl/RestSBControllerImplTest.java @@ -16,32 +16,156 @@ package org.onosproject.protocol.rest.ctl; +import org.apache.commons.io.IOUtils; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.glassfish.jersey.test.TestProperties; import org.junit.Before; import org.junit.Test; +import org.onlab.junit.TestUtils; import org.onlab.packet.IpAddress; import org.onosproject.protocol.rest.DefaultRestSBDevice; import org.onosproject.protocol.rest.RestSBDevice; +import org.onosproject.common.event.impl.TestEventDispatcher; +import org.onosproject.protocol.rest.RestSBEventListener; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.PATCH; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.sse.InboundSseEvent; +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static org.junit.Assert.*; /** * Basic testing for RestSBController. */ -public class RestSBControllerImplTest { +public class RestSBControllerImplTest extends JerseyTest { + private static final String SAMPLE_PAYLOAD = "{ \"msg\": \"ONOS Rocks!\" }"; RestSBControllerImpl controller; RestSBDevice device1; RestSBDevice device2; + /** + * Mockup of an arbitrary device. + */ + @Path("testme") + public static class HelloResource { + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response post(InputStream payload) throws IOException { + String responseText = IOUtils.toString(payload, StandardCharsets.UTF_8); + if (responseText.equalsIgnoreCase(SAMPLE_PAYLOAD)) { + return Response.ok().build(); + } + return Response.status(Response.Status.EXPECTATION_FAILED).build(); + } + + @POST + @Path("testpostreturnstring") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response postReturnString(InputStream payload) throws IOException { + String responseText = IOUtils.toString(payload, StandardCharsets.UTF_8); + if (responseText.equalsIgnoreCase(SAMPLE_PAYLOAD)) { + return Response.ok().entity("OK").build(); + } + return Response.status(Response.Status.EXPECTATION_FAILED).entity("Failed").build(); + } + + @PUT + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response put(InputStream payload) throws IOException { + String responseText = IOUtils.toString(payload, StandardCharsets.UTF_8); + if (responseText.equalsIgnoreCase(SAMPLE_PAYLOAD)) { + return Response.ok().build(); + } + return Response.status(Response.Status.EXPECTATION_FAILED).build(); + } + + @PATCH + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response patch(InputStream payload) throws IOException { + String responseText = IOUtils.toString(payload, StandardCharsets.UTF_8); + if (responseText.equalsIgnoreCase(SAMPLE_PAYLOAD)) { + return Response.ok().build(); + } + return Response.status(Response.Status.EXPECTATION_FAILED).build(); + } + + @GET + public String getHello() { + return SAMPLE_PAYLOAD; + } + + @DELETE + public int delete() { + return Response.Status.NO_CONTENT.getStatusCode(); + } + + @GET + @Path("server-sent-events") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void getServerSentEvents(@Context SseEventSink eventSink, @Context Sse sse) throws InterruptedException { + new Thread(() -> { + try { + for (int i = 0; i < 10; i++) { + // ... code that waits 0.1 second + Thread.sleep(100L); + final OutboundSseEvent event = sse.newEventBuilder() + .id(String.valueOf(i)) + .name("message-to-rest-sb") + .data(String.class, "Test message " + i + "!") + .build(); + eventSink.send(event); + System.out.println("Message " + i + " sent"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + }).start(); + } + + } + + @Override + protected Application configure() { + set(TestProperties.CONTAINER_PORT, 8080); + return new ResourceConfig(HelloResource.class); + } @Before - public void setUp() { + public void setUpTest() { controller = new RestSBControllerImpl(); + TestUtils.setField(controller, "eventDispatcher", new TestEventDispatcher()); controller.activate(); device1 = new DefaultRestSBDevice(IpAddress.valueOf("127.0.0.1"), 8080, "foo", "bar", "http", null, true); device2 = new DefaultRestSBDevice(IpAddress.valueOf("127.0.0.2"), 8080, "foo1", "bar2", "http", null, true); controller.addDevice(device1); + } @Test @@ -56,4 +180,134 @@ public class RestSBControllerImplTest { controller.removeDevice(device2.deviceId()); assertFalse("Device2 not removed", controller.getDevices().containsValue(device2)); } + + /** + * Tests the post function of the REST SB Controller. + */ + @Test + public void testPost() { + InputStream payload = new ByteArrayInputStream(SAMPLE_PAYLOAD.getBytes(StandardCharsets.UTF_8)); + int response = controller.post(device1.deviceId(), "/testme", payload, MediaType.APPLICATION_JSON_TYPE); + assertEquals(HttpURLConnection.HTTP_OK, response); + } + + /** + * Tests the put function of the REST SB Controller. + */ + @Test + public void testPut() { + InputStream payload = new ByteArrayInputStream(SAMPLE_PAYLOAD.getBytes(StandardCharsets.UTF_8)); + int response = controller.put(device1.deviceId(), "/testme", payload, MediaType.APPLICATION_JSON_TYPE); + assertEquals(HttpURLConnection.HTTP_OK, response); + } + + @Test + public void testPatch() { + InputStream payload = new ByteArrayInputStream(SAMPLE_PAYLOAD.getBytes(StandardCharsets.UTF_8)); + int response = controller.patch(device1.deviceId(), "/testme", payload, MediaType.APPLICATION_JSON_TYPE); + assertEquals(HttpURLConnection.HTTP_OK, response); + } + + /** + * Tests the delete function of the REST SB Controller. + */ + @Test + public void testDelete() { + int response = controller.delete(device1.deviceId(), "/testme", null, null); + assertEquals(HttpURLConnection.HTTP_OK, response); + } + + /** + * Tests the get function of the REST SB Controller. + */ + @Test + public void testGet() throws IOException { + InputStream payload = controller.get(device1.deviceId(), "/testme", MediaType.APPLICATION_JSON_TYPE); + String responseText = IOUtils.toString(payload, StandardCharsets.UTF_8); + assertEquals(SAMPLE_PAYLOAD, responseText); + } + + /** + * Tests the post function of the REST SB Controller. + */ + @Test + public void testPostReturnString() { + InputStream payload = new ByteArrayInputStream(SAMPLE_PAYLOAD.getBytes(StandardCharsets.UTF_8)); + String result = controller.post(device1.deviceId(), "/testme/testpostreturnstring", + payload, MediaType.APPLICATION_JSON_TYPE, String.class); + assertEquals("OK", result); + } + + /** + * Tests the low level getServerSentEvents function of the REST SB Controller. + * + * Note: If the consumer throws an error it will not be propagated back up + * to here - instead the source will go in to error and no more callbacks + * will be executed + */ + @Test + public void testGetServerSentEvents() { + Consumer sseEventConsumer = (event) -> { + System.out.println("ServerSentEvent received: " + event); + assertEquals("message-to-rest-sb", event.getName()); + // Just to show it works we stop before the last message is sent + if (Integer.parseInt(event.getId()) == 8) { + controller.cancelServerSentEvents(device1.deviceId()); + } + }; + + Consumer sseError = (error) -> { + System.err.println(error); + controller.cancelServerSentEvents(device1.deviceId()); + //fail(error.toString()); //Does nothing as it's in lambda scope + }; + + int response = controller.getServerSentEvents(device1.deviceId(), + "/testme/server-sent-events", + sseEventConsumer, + sseError + ); + assertEquals(204, response); + } + + /** + * Test of cancelling of events from a device - in this case there should not be any. + */ + @Test + public void testCancelServerSentEvents() { + assertEquals(404, controller.cancelServerSentEvents(device1.deviceId())); + } + + /** + * Test the high level API for Server Sent Events. + */ + @Test + public void testStartServerSentEvents() { + AtomicInteger listener1Count = new AtomicInteger(); + AtomicInteger listener2Count = new AtomicInteger(); + + RestSBEventListener listener1 = event -> { + System.out.println("Event on Lsnr1: " + event); + listener1Count.incrementAndGet(); + if (Integer.parseInt(event.getId()) == 8) { + controller.cancelServerSentEvents(device1.deviceId()); + } + }; + + RestSBEventListener listener2 = event -> { + listener2Count.incrementAndGet(); + System.out.println("Event on Lsnr2: " + event); + }; + + controller.addListener(listener1); + controller.addListener(listener2); + + controller.startServerSentEvents(device1.deviceId(), "/testme/server-sent-events"); + + controller.removeListener(listener1); + controller.removeListener(listener2); + + assertEquals(9, listener1Count.get()); + assertEquals(9, listener2Count.get()); + } } \ No newline at end of file diff --git a/tools/build/bazel/generate_workspace.bzl b/tools/build/bazel/generate_workspace.bzl index d96a4fecdb..d7891a17d8 100644 --- a/tools/build/bazel/generate_workspace.bzl +++ b/tools/build/bazel/generate_workspace.bzl @@ -510,6 +510,12 @@ def generated_maven_jars(): sha1 = "c7ea0a5819e4688317024c9f4b6e3de54f9d0f3d", ) + native.maven_jar( + name = "jersey_media_sse", + artifact = "org.glassfish.jersey.media:jersey-media-sse:2.26", + sha1 = "61cfeb0df911585140f64f3369eb524e0e02f534", + ) + native.maven_jar( name = "jersey_server", artifact = "org.glassfish.jersey.core:jersey-server:2.26", @@ -1658,6 +1664,12 @@ def generated_java_libraries(): exports = ["@jersey_media_multipart//jar"], ) + native.java_library( + name = "jersey_media_sse", + visibility = ["//visibility:public"], + exports = ["@jersey_media_sse//jar"], + ) + native.java_library( name = "jersey_server", visibility = ["//visibility:public"], @@ -2478,6 +2490,7 @@ artifact_map["@jersey_container_jetty_http//jar"] = "mvn:org.glassfish.jersey.co artifact_map["@jersey_container_servlet//jar"] = "mvn:org.glassfish.jersey.containers:jersey-container-servlet:jar:2.26" artifact_map["@jersey_container_servlet_core//jar"] = "mvn:org.glassfish.jersey.containers:jersey-container-servlet-core:jar:2.26" artifact_map["@jersey_media_multipart//jar"] = "mvn:org.glassfish.jersey.media:jersey-media-multipart:jar:2.26" +artifact_map["@jersey_media_sse//jar"] = "mvn:org.glassfish.jersey.media:jersey-media-sse:jar:2.26" artifact_map["@jersey_server//jar"] = "mvn:org.glassfish.jersey.core:jersey-server:jar:2.26" artifact_map["@jersey_hk2//jar"] = "mvn:org.glassfish.jersey.inject:jersey-hk2:jar:2.26" artifact_map["@jersey_test_framework_core//jar"] = "mvn:org.glassfish.jersey.test-framework:jersey-test-framework-core:jar:NON-OSGI:2.26"