Mavo Mavo - 1 year ago 57
Java Question

How to properly manage closable resources in Reactor

I have a http client and executor which should be closed when all work is done.

I'm trying to use Flux.using method in a way it's described here for RxJava 1.x:

My resource creating method:

public static Flux<GithubClient> createResource(String token,
int connectionCount) {

return Flux.using(
() -> { + " : Created and started the client.");
return new GithubClient(token, connectionCount);
client -> { + " : About to create Observable.");
return Flux.just(client);
client -> { + " : Closing the client.");
).doOnSubscribe(subscription ->"subscribed"));

Which I then use:

Flux<StateMutator> dataMutators = GithubClient.createResource(
.flatMap(client -> client.loadRepository(organization, repository)

The problem is that the client connection is closed even before first request is made.

[main] INFO - main : Created and started the client.
[main] INFO - main : About to create Observable.
[main] INFO - subscribed
[main] INFO - main : Closing the client.

java.lang.IllegalStateException: Client instance has been closed.

at org.glassfish.jersey.client.JerseyClient.checkNotClosed(

Did not find any example for Reactor.


Answer Source

I read the documentation for using again and found my error. Returning the client with return Flux.just(client); does not makes sense as the Flux terminates immediatelly which triggers client closing.

I ended up implementing:

public static Flux<StateMutator> createAndExecute(GithubPublicConfiguration config,
                                                  Function<GithubClient, Flux<StateMutator>> toExecute) {

    return Flux.using(
            () -> {
                logger.debug(Thread.currentThread().getName() + " : Created and started the client.");
                return new GithubClient(entityModelHandler, config.getAccessToken(), config.getConnectionCount());
            client -> toExecute.apply(client),
            client -> {
                logger.debug(Thread.currentThread().getName() + " : Closing the client.");

which I then call with:

            client -> client.loadRepository(organization, repository))

Now all operations are in appropriate order.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download