xtiger xtiger - 5 months ago 33
Java Question

Vertx throws IllegalStateException : Response has already been written

I am building a rest api which collect data from others api, do some logic with it and send back to the client :

My main class:

public class Main {

public static void main(String[] args) {

Vertx.vertx().deployVerticle(RestVerticle.class.getName());
}


This is my RestVerticle:

public class RestVerticle extends AbstractVerticle {

public static final String API_V1 = "/api/v1";

private Map<String, JsonObject> products = new HashMap<>();

@Override
public void start() {

Router router = Router.router(vertx);

router.route().handler(BodyHandler.create());
router.get(API_V1 + "/business/all").handler(this::getAllBusinesses);

vertx.createHttpServer().requestHandler(router::accept).listen(8080);
}

private void getAllBusinesses(RoutingContext routingContext) {

vertx.deployVerticle(YelpClientVerticle.class.getName());
MessageConsumer<String> consumer = vertx.eventBus().consumer("api");

consumer.handler(message -> {
JsonObject m = new JsonObject((String) message.body());
System.out.println("Received message: " + message.body());
routingContext.response().putHeader("content-type", "application/json").end(m.encodePrettily());

});
}


}

This is my httpclient , which calls to the Yelp API:

public class YelpClientVerticle extends AbstractVerticle {

private static final String API_HOST = "api.yelp.com";
private static final String DEFAULT_TERM = "dinner";
private static final String DEFAULT_LOCATION = "San Francisco, CA";
private static final int SEARCH_LIMIT = 20;
private static final String SEARCH_PATH = "/v2/search";
private static final String BUSINESS_PATH = "/v2/business";

/*
* Update OAuth credentials below from the Yelp Developers API site:
* http://www.yelp.com/developers/getting_started/api_access
*/
private static final String CONSUMER_KEY = "XXXXX";
private static final String CONSUMER_SECRET = "XXXXX";
private static final String TOKEN = "XXXXX";
private static final String TOKEN_SECRET = "XXXXX";

OAuthService service;
Token accessToken;

/**
* Setup the Yelp API OAuth credentials.
*
* @param consumerKey Consumer key
* @param consumerSecret Consumer secret
* @param token Token
* @param tokenSecret Token secret
*/

/**
* Creates and sends a request to the Search API by term and location.
* <p>
* See <a href="http://www.yelp.com/developers/documentation/v2/search_api">Yelp Search API V2</a>
* for more info.
*
* @param term <tt>String</tt> of the search term to be queried
* @param location <tt>String</tt> of the location
* @return <tt>String</tt> JSON Response
*/
public String searchForBusinessesByLocation(String term, String location) {
OAuthRequest request = createOAuthRequest(SEARCH_PATH);
request.addQuerystringParameter("term", term);
request.addQuerystringParameter("location", location);
request.addQuerystringParameter("limit", String.valueOf(SEARCH_LIMIT));
return sendRequestAndGetResponse(request);
}

/**
* Creates and sends a request to the Business API by business ID.
* <p>
* See <a href="http://www.yelp.com/developers/documentation/v2/business">Yelp Business API V2</a>
* for more info.
*
* @param businessID <tt>String</tt> business ID of the requested business
* @return <tt>String</tt> JSON Response
*/
public String searchByBusinessId(String businessID) {
OAuthRequest request = createOAuthRequest(BUSINESS_PATH + "/" + businessID);
return sendRequestAndGetResponse(request);
}

/**
* Creates and returns an {@link OAuthRequest} based on the API endpoint specified.
*
* @param path API endpoint to be queried
* @return <tt>OAuthRequest</tt>
*/
private OAuthRequest createOAuthRequest(String path) {
OAuthRequest request = new OAuthRequest(Verb.GET, "https://" + API_HOST + path);
return request;
}

/**
* Sends an {@link OAuthRequest} and returns the {@link Response} body.
*
* @param request {@link OAuthRequest} corresponding to the API request
* @return <tt>String</tt> body of API response
*/
private String sendRequestAndGetResponse(OAuthRequest request) {
System.out.println("Querying " + request.getCompleteUrl() + " ...");
this.service.signRequest(this.accessToken, request);
Response response = request.send();
return response.getBody();
}

/**
* Queries the Search API based on the command line arguments and takes the first result to query
* the Business API.
*
* @param yelpApiCli <tt>YelpAPICLI</tt> command line arguments
*/
private String queryAPI(YelpAPICLI yelpApiCli) {
String searchResponseJSON =
searchForBusinessesByLocation(yelpApiCli.term, yelpApiCli.location);

JSONParser parser = new JSONParser();
JSONObject response = null;
try {
response = (JSONObject) parser.parse(searchResponseJSON);
} catch (ParseException pe) {
System.out.println("Error: could not parse JSON response:");
System.out.println(searchResponseJSON);
System.exit(1);
}

JSONArray businesses = (JSONArray) response.get("businesses");
JSONObject firstBusiness = (JSONObject) businesses.get(0);
String firstBusinessID = firstBusiness.get("id").toString();
System.out.println(String.format(
"%s businesses found, querying business info for the top result \"%s\" ...",
businesses.size(), firstBusinessID));

// Select the first business and display business details
String businessResponseJSON = searchByBusinessId(firstBusinessID.toString());
System.out.println(String.format("Result for business \"%s\" found:", firstBusinessID));
System.out.println(businessResponseJSON);

return businessResponseJSON;
}

/**
* Command-line interface for the sample Yelp API runner.
*/
private static class YelpAPICLI {
@Parameter(names = {"-q", "--term"}, description = "Search Query Term")
public String term = DEFAULT_TERM;

@Parameter(names = {"-l", "--location"}, description = "Location to be Queried")
public String location = DEFAULT_LOCATION;
}

@Override
public void start() throws Exception {
// Note! in real-life you wouldn't often set trust all to true as it could leave you open to man in the middle attacks.
this.service =
new ServiceBuilder().provider(TwoStepOAuth.class).apiKey(CONSUMER_KEY)
.apiSecret(CONSUMER_SECRET).build();
this.accessToken = new Token(TOKEN, TOKEN_SECRET);
YelpAPICLI yelpApiCli = new YelpAPICLI();
new JCommander(yelpApiCli);


String response = queryAPI(yelpApiCli);
vertx.eventBus().send("api", response);
}


}

I'm running in to 2 problems.

The first problem is the Yelp client take too long to process the request, and it block the main thread with this warning:

Jan 04, 2016 1:34:30 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-4,5,main] has been blocked for 3151 ms, time limit is 2000


The second problem is that , after finish process the first request, for example, the first time I go to my localhost:8080/api/v1/business/all , the request return sucessfully , but hit the URL again the next time, it throws an exception like this :

Jan 04, 2016 1:34:30 AM io.vertx.core.eventbus.impl.HandlerRegistration
SEVERE: Failed to handleMessage
java.lang.IllegalStateException: Response has already been written


How can I resolve these 2 problems?

Answer

The problem is that you're doing all the work - starting the Yelp Verticle, and registering a consumer on the event bus - on every request. This cannot be what you want.

So, what I think is happening:

  1. You make a request to your rest API.
  2. The getAllBusinesses() method is executed.
  3. The YelpClientVerticle is started.
  4. A handler is registered on the api endpoint which will write to the response.
  5. Your YelpClientVerticle does a lot of blocking work - this is why you're getting the BlockedThreadChecker warning.
  6. Finally, the Yelp request returns and sends a message over the event bus, which then gets written to the response.

  7. You make another request

  8. GOTO 2

Your problem is that on every request you're starting another YelpClientVerticle, and registering another handler to listen on the same EventBus endpoint address.

It's totally acceptable to have multiple handlers listening on the same EventBus address. When that happens, Vert.x selects one of the handlers in a RoundRobin fashion.

I'm guessing that on the second request, Vertx is selecting the first handler, and trying to write to the first request's response which you've already written to. Hence the error.

I would try moving the deploying of the YelpClientVerticle into the startup of your RestVerticle - then you'll only have one instance.

You might want to switch the sender / consumer around, so you send a message to the YelpClientVerticle which then responds with the reply.

You also might want to read the documentation on Running Blocking Code, as your Yelp client looks like it's blocking.

Hope this helps