Larry B. Larry B. - 1 month ago 9
Java Question

Get JAX-RS AsyncResponse, but suspend later

Consider the following code to listen for an update with long-polling:

Map<String, List<AsyncResponse>> tagMap = new ConcurrentGoodStuff();

// This endpoint listens for notifications of the tag
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@GET
@Path("listen/{tag}")
public void listenForUpdates(
@PathParam("tag") final String tag,
@Suspended final AsyncResponse response) {
tagMap.get(tag).add(response);
}

// This endpoint is for push-style notifications
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@PUT
@Path("update/{tag}/{value}")
public Response updateTag(
@PathParam("tag") final String tag,
@PathParam("value") final String value) {
for(AsyncResponse response : tagMap.get(tag)) {
// Resumes all previously suspended responses
response.resume(value);
}
return Response.ok("cool whatever").build();
}


The client adds a listener with the normal Jersey client's
AsyncInvoker
, calls the asynchronous task, and then another task calls the update method.

When I'm testing this, I run into a race condition. Right after I add the listener asynchronously on
listenForUpdates()
, I make an update on the endpoint with
updateTag()
synchronously. But the update gets run before the listener is added, and the asynchronous response fails to resume.

A solution to this is to call the
suspend()
method on the response after adding it to the listeners. But it's not clear how to do that, given that
@Suspended
provides an already-suspended
AsyncResponse
object. What should I do so that the async response is suspended only after adding to listener? Will that actually call the suspend method? How can I get this to work with the Jersey async client, or should I use a different long-polling client?

For solutions, I'm open to different libraries, like Atmosphere or Guava. I am not open to adding a
Thread.sleep()
in my test, since that is an intermittent failure waiting to happen.

Answer

I ended up using RxJava, but not before coming up with a just-as-good solution using BlockingQueue instead of List in the Map. It goes something like this:

ConcurrentMap<String, BlockingQueue<AsyncResponse>> tagMap = new ConcurrentGoodStuff();

// This endpoint initiates a listener array for the tag.
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@GET
@Path("initListen/{tag}")
public void listenForUpdates(
        @PathParam("tag") final String tag) {
    tagMap.putIfAbsent(tag, new LinkedBlockingQueue<>());
}

// This endpoint listens for notifications of the tag
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@GET
@Path("listen/{tag}")
public void listenForUpdates(
        @PathParam("tag") final String tag,
        @Suspended final AsyncResponse response) {
    BlockingQueue<AsyncResponse> responses = tagMap.get(tag);

    if (responses != null) {
        responses.add(response);
    }
}

// This endpoint is for push-style notifications
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@PUT
@Path("update/{tag}/{value}")
public Response updateTag(
        @PathParam("tag") final String tag,
        @PathParam("value") final String value) {
    BlockingQueue<AsyncResponse> responses = tagMap.get(tag);

    if (responses == null) {
        return Response.noContent().build();
    }
    if (responses.isEmpty()) {
        // Block-wait for an async listener
        try {
            AsyncResponse response = tagMap.poll(15, TimeUnit.SECONDS);

            if (response == null) {
                return Response.noContent().build();
            }

            response.resume(value);
        } catch (InterruptedException e) {
            return Response.noContent().build();
        }
    } else {
        for (AsyncResponse response : responses) {
            // Resumes all previously suspended responses
            response.resume(value);
        }
    }
    return Response.ok("cool whatever").build();
}

I haven't tested this exact code, but I used some version of it in the past. As long as you call the initListen endpoint synchronously first, you can call the asynchronous listen endpoint and then the synchronous update endpoint and there won't be any significant race condition.

There is a slight hint of a race condition in the update endpoint, but it's minor. The responses blocking queue could become empty on iteration, or it may be updated by multiple sources differently. To alleviate this, I've used the drainTo(Collection) method on a per-request instantiated data structure. This still does not solve the use case where multiple clients may try updating the same tag of listeners, but I do not need this use case.

Comments