Jaythaking Jaythaking - 2 months ago 22
Android Question

Using RxJava to chain request on a single thread

I'm saving the user's location in the app local database and then send it to the server. Once the server return a success, I delete the location that was sent.

Each time a point has been saved in the database I call this method:

public void sendPoint(){
amazonRetrofit.postAmazonPoints(databaseHelper.getPoints())
.map(listIdsSent -> deleteDatabasePoints(listIdsSent))
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}



  1. I query the database for the point to be send to the server

  2. I received from the server the list of point successfully sent

  3. Using
    .map()
    , I gather the point successfully sent and delete them from the local database



Sometimes, It happens that I call this method repeatedly without having wait for the previous request to be completed and deleted the point sent. So, when I call that method again, it will post the same point as the previous request because that previous request isn't completed yet thus haven't deleted the point using the
.map()
yet. Causing the server to receive duplicates...

Timeline





  • 1st Call to
    postPoint()

  • Retrive point A,B,C from the database

  • Post point A,B,C to the server

  • 2nd call to
    postPoint()

  • Retrive point A,B,C,D from the database

  • Post point A,B,C,D to the server

  • Receive success from the 1st request

  • Deleting A,B,C from the local database

  • Receive success from the 2nd request

  • Deleting A,B,C,D from the local database



Result:

The server database now have received : A,B,C,A,B,C,D

Each request occurs sequentially but somehow the same location points are sent to the server when I call
sendPoint()
too quickly. How can I fix this?

Answer

First to all you are not using observerOn operator properly, observeOn operator is applied over the steps in your pipeline, once is defined. So if you define at the end of the pipeline just before subscribeOn, then none of your previous steps will be executed in that thread.

Also, since you need to wait until the response of your server call, you can use the callbacks handlers that Subscriber already provide (onNext(), onComplete())

 public void sendPoint(){
    Observable.from(databaseHelper.getPoints())
              .observeOn(AndroidSchedulers.mainThread())
              .flatMap(poins-> amazonRetrofit.postAmazonPoints(points))
              .subscribeOn(AndroidSchedulers.from(backgroundLooper))
              .subscribe(listIdsSent-> deleteDatabasePoints(listIdsSent), () -> emitStoreChange(finalEvent));
}

if you want to see more examples of ObserverOn and SubscribeOn you can take a look here. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

Comments