Jaythaking Jaythaking - 1 year ago 109
Android Question

Merge and handle two RxJava Observable of different types

My goal

I want to check if the server's token is still valid, let's say I know that information just by calling this getter :

. Then, if the token is invalid, calling a request to get a new token and updating the token locally, THEN, proceed with the next request to post the point to the server. That's because I need a valid token in order to make any further server request.

Let say I have those two server request that returns

This request is meant to get the server token, then upon reception, updating it.

Observable<Response<EntityToken>> updateServerToken = retrofitApi.authenticate(username,password);

This request is meant to post the current location to the server, then if it succeed, return the saved point

Observable<Response<EntityPoint>> updateServerToken = retrofitApi.postPoint(point);

Issues i'm facing currently:

  • Both observable that needs to be merged are from different type

  • Executing the token update request only if it needs to

  • Waiting for the token update request to complete before executing the request to post points

How should I write my RxJava
to satisfy all those condition?

Answer Source

First, I would create a method that checks if the entityToken is valid or not. If valid, use Observable.just() but you have to create an instance of Response somehow. If invalid, then call the server using the API in your requirement retrofitApi.authenticate(). Either path is taken, the method getTokenObservable() emits Observable<Response<EntityToken>>.

public Observable<Response<EntityToken>> getTokenObservable(EntityToken entityToken, String username, String password) {    
  boolean isTokenValid = preferenceHelper.isTokenValid(entityToken);
  if (isTokenValid) {
    //my assumption that you have something like this
    Response<EntityToken> responseToken = new Response<EntityToken>(); 
    return Observable.just(new Response<EntityToken>(entityToken.class));
  } else {
    Observable<Response<EntityToken>> updateServerToken = retrofitApi.authenticate(username, password);
    return updateServerToken;

and then when calling it, use flatMap() which take emisssions of Observable<Response<EntityToken>> and returns emissions of Observable<Response<EntityPoint>>. Subscribe and proceed as normal.

Observable<Response<EntityToken>> updatePointObservable = getTokenObservable(entityToken, username, password);
.flatMap(new Func1<Response<EntityToken>, Observable<Response<EntityPoint>>>() {
    public Observable<Response<EntityPoint>> call(Response<EntityToken> responseToken) { 
        EntityToken entityToken = responseToken.getEntityToken(); //my assumption
        saveTokenLocally(entityToken); //this is where you save your token locally, change to the right method that you have
        Observable<Response<EntityPoint>> updateServerTokenObservable = retrofitApi.postPoint(point, entityToken); //pass your entityToken to the call?
        return updateServerTokenObservable;     
.subscribe(new Observer<Response<EntityPoint>>() {
    public void onCompleted() {
        //your own logic

    public void onError(Throwable e) {
        //your own logic

    public void onNext(Response<EntityPoint> entityPoint) {
        //your own logic