H3x0n H3x0n - 7 months ago 4
Java Question

RxJava load data only ones when same observable got subscribed multiple times

I have following problem:

i have a observable that is doing some work inside, but other observables need the output of that observable to work. I have tried to return the api from a observable thats creating the api, but inside the log i see that because of multiple methods subscribe on this observable the api got initiated multiple times.

thats my observable thats create the object:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> {
if (mApi == null) {
//do some work
}
subscriber.onNext(mApi);
subscriber.unsubscribe();
})


thats my observable that needs the object

loadApi().flatMap(api -> api....()));


im using

.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread())
.unsubscribeOn(Schedulers.io()


on all observables.

Answer

I'm not sure that I understood your question correctly, but I figure you're looking for a way to share the emissions of an observable between several subscribers. There are several ways of doing this. For one, you could use a Connectable Observable like so:

ConnectableObservable<Integer> obs = Observable.range(1,3).publish();
obs.subscribe(item -> System.out.println("Sub 1 " + item));
obs.subscribe(item -> System.out.println("Sub 2 " + item));
obs.connect(); //Now the source observable starts emitting items

Output:

Sub 1 1
Sub 2 1
Sub 1 2
Sub 2 2
Sub 1 3
Sub 2 3

Alternatively, you could use a PublishSubject:

PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject
subject.subscribe(item -> System.out.println("Sub 1 " + item)); //Subscribe both subscribers on the publish subject
subject.subscribe(item -> System.out.println("Sub 2 " + item)); 
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable

Output:

Sub 1 1
Sub 2 1
Sub 1 2
Sub 2 2
Sub 1 3
Sub 2 3

Both of these examples are single threaded, but you can easily add observeOn or subscirbeOn calls to make them async.

Comments