Jaythaking Jaythaking - 1 year ago 87
Android Question

Using RxJava to chain request on a single thread

I'm saving the user's location in the app local database and then send it to the server. Once the server return a success, I delete the location that was sent.

Each time a point has been saved in the database I call this method:

public void sendPoint(){
.map(listIdsSent -> deleteDatabasePoints(listIdsSent))
.doOnCompleted(() -> emitStoreChange(finalEvent))

  1. I query the database for the point to be send to the server

  2. I received from the server the list of point successfully sent

  3. Using
    , I gather the point successfully sent and delete them from the local database

Sometimes, It happens that I call this method repeatedly without having wait for the previous request to be completed and deleted the point sent. So, when I call that method again, it will post the same point as the previous request because that previous request isn't completed yet thus haven't deleted the point using the
yet. Causing the server to receive duplicates...


  • 1st Call to

  • Retrive point A,B,C from the database

  • Post point A,B,C to the server

  • 2nd call to

  • Retrive point A,B,C,D from the database

  • Post point A,B,C,D to the server

  • Receive success from the 1st request

  • Deleting A,B,C from the local database

  • Receive success from the 2nd request

  • Deleting A,B,C,D from the local database


The server database now have received : A,B,C,A,B,C,D

Each request occurs sequentially but somehow the same location points are sent to the server when I call
too quickly. How can I fix this?

Answer Source

First to all you are not using observerOn operator properly, observeOn operator is applied over the steps in your pipeline, once is defined. So if you define at the end of the pipeline just before subscribeOn, then none of your previous steps will be executed in that thread.

Also, since you need to wait until the response of your server call, you can use the callbacks handlers that Subscriber already provide (onNext(), onComplete())

 public void sendPoint(){
              .flatMap(poins-> amazonRetrofit.postAmazonPoints(points))
              .subscribe(listIdsSent-> deleteDatabasePoints(listIdsSent), () -> emitStoreChange(finalEvent));

if you want to see more examples of ObserverOn and SubscribeOn you can take a look here. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download