Alleo Indong Alleo Indong - 2 months ago 22
Android Question

RxJava + Retrofit + Realm is doing unlimited get request

I am completely new to rxJava and it's really confusing, I want to make my app offline first and I've decided to use Realm and Retrofit, First I want to get the data from retrofit and then get the data from my remote webservice then, use realm's

insertOrUpdate
to merge the remote objects with the local one. I'm able to get on this process so far but when I looked into my Network requests on stetho, this method is complete requesting infinite times. Where did I go wrong? Here's the function

public Observable<RealmResults<Event>> all() {
Realm realm = Realm.getDefaultInstance();

return realm.where(Event.class).findAllAsync()
.asObservable()
.filter(new Func1<RealmResults<Event>, Boolean>() {
@Override
public Boolean call(RealmResults<Event> events) {
return events.isLoaded();
}
})
.doOnNext(new Action1<RealmResults<Event>>() {
@Override
public void call(RealmResults<Event> events) {
service.getEvents()
.subscribeOn(Schedulers.io())
.subscribe(new Action1<List<Event>>() {
@Override
public void call(final List<Event> events) {
try(Realm realm = Realm.getDefaultInstance()) {
realm.executeTransaction(new Realm.Transaction() {
@Override
public void execute(Realm realm) {
realm.insertOrUpdate(events);
}
});
} // auto-close
}
});
}
});
}


and here's the function on my activity, where I use it

private void getEvents() {
Log.i("EVENTSELECTION", "STARTING");
repository.all()
.subscribe(new Subscriber<List<Event>>() {
@Override
public void onCompleted() {
Log.i("EVENTSELECTION", "Task Completed");
swipeRefreshLayout.setRefreshing(false);
}

@Override
public void onError(Throwable e) {
Log.e("EVENTSELECTION", e.getMessage());
swipeRefreshLayout.setRefreshing(false);
e.printStackTrace();
}

@Override
public void onNext(List<Event> events) {
Log.i("EVENTSELECTION", String.valueOf(events.size()));
}
});
}


Thank you so much.

Answer Source

Where did I go wrong?

Let's go through it:

1.

public Observable<RealmResults<Event>> all() {
    Realm realm = Realm.getDefaultInstance(); 

This opens a Realm instance that will never be closed. So your Realm lifecycle management is wrong, refer to the documentation for best practices.

2.

return realm.where(Event.class).findAllAsync()
        .asObservable() // <-- listens for changes in the Realm
// ...
        .doOnNext(new Action1<RealmResults<Event>>() {
            @Override
            public void call(RealmResults<Event> events) {
                service.getEvents() // <-- downloads data
                        .subscribeOn(Schedulers.io())
                        .subscribe(new Action1<List<Event>>() {

You basically say that "in case there are any changes made to data in Realm, then download data from the service and write it into the Realm"

Which will trigger the RealmChangeListener which will trigger a download and so on.

This is a conceptual error, you're using Realm notifications incorrectly.


RealmResults<T> is not just a list of objects, it is also a subscription for changes. So you need to keep it as a field reference, and "stay subscribed to changes in the database".

RealmResults<Sth> results;
RealmChangeListener<RealmResults<Sth>> changeListener = (element) -> {
    if(element.isLoaded()) {
        adapter.updateData(element);
    }
};

void sth() {
    results = realm.where(Sth.class).findAllSortedAsync("id");
    results.addChangeListener(changeListener);
}

void unsth() {
    if(results != null && results.isValid()) {
        results.removeChangeListener(changeListener);
        results = null;
    }
}

In your case, RealmResults<T> which symbolizes a subscription and also provides access to the current/new data is wrapped as an Observable<T> which you can create subscribers to.

Observable<List<<Sth>> results;
Subscription subscription;
Action1<List<Sth>> changeListener = (element) -> {
    if(element.isLoaded()) {
        adapter.updateData(element);
    }
};

void sth() {
    results = realm.where(Sth.class).findAllSortedAsync("id").asObservable();
    subscription = results.subscribe(changeListener);
}

void unsth() {
    if(subscription != null && !subscription.isUnsubscribed()) {
        subscription.unsubscribe();
        subscription = null; 
        results = null;
    }
}

As you can see, you have a subscription at the start of the component, and an unsubscription at the end of the component.

Calling Observable.first() is incorrect, it does not make sense to do that. If you saw it in any tutorial (I've seen it before...), then that tutorial was wrong.