hiBrianLee hiBrianLee - 3 months ago 27
Android Question

Chaining RxJava observables with callbacks/listeners

I am using Retrofit with Observables, and would like to chain the observables. Usually it works well with functions like

map()
or
flatMap()
, since the
api
returns an Observable that does the task. But in this case I have to do the following:


  1. getKey() from the
    api

  2. Use the returned key in another library
    Foo
    and wait for the callback to be called.

  3. When the callback returns, send the result to the
    api
    .



I'd like this to be a single chained call so that I just have to subscribe once. I'm guessing I can use
merge()
or
join()
or something, but wasn't sure what the best approach would be to handle the callback.

Is there a way to make this better? This is what I have so far:

api.getKey().subscribe(new Action1<String>() {
@Override
public void call(String key) {
Foo foo = new Foo();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
api.sendAwesome(awesome)
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
}
});
foo.makeAwesome();
}
});

Answer

Adapting clemp6r's solution, here is another one that needs neither Subjects nor nested Subscriptions:

api.getKey().flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {

        return Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(final Subscriber<? super String> subscriber) {
                Foo foo = new Foo();
                foo.setAwesomeCallback(new AwesomeCallback() {
                    @Override
                    public void onAwesomeReady(String awesome) {
                        if (! subscriber.isUnsubscribed()) {
                            subscriber.onNext(awesome);
                            subscriber.onComplete();
                        }
                    }
                });
                foo.makeAwesome();
            } 
        });
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String awesome) {
        return sendAwesome(awesome);
   }
}).subscribe(new Action1<Void>() {
    @Override
    public void call(Void aVoid) {
        handleAwesomeSent();
    }
});

In general I think it always possibly to wrap any callback based asynchronous operation in an Observable using Observable.create().

Comments