savepopulation savepopulation - 2 months ago 105
Android Question

Android Realm + RxJava - Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created

I'm trying to implement RxJava + Realm + Retrofit + Repository Pattern

Here's my local implementation:

@Override
public Observable<Page> search(@NonNull final String query) {

return Realm.getDefaultInstance().where(Page.class)
.equalTo("query", query)
.findAll()
.asObservable()
.cast(Page.class);
}


Here's my remote implementation:

@Override
public Observable<Page> search(@NonNull String query) {
return mWikiServices.search(query).map(new Func1<Result, Page>() {
@Override
public Page call(Result result) {
final List<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
return pages.get(0);
}
});
}


Here's my repo implementation:

final Observable<Page> localResult = mSearchLocalDataSource.search(query);
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
.doOnNext(new Action1<Page>() {
@Override
public void call(Page page) {
//mSearchLocalDataSource.save(query, page);
//mResultCache.put(query, page);
}
});

return Observable.concat(localResult, remoteResult)
.first()
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
});


And finally here's my subscription in presenter.

final Subscription subscription = mSearchRepository.search(this.mQuery)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Page>() {
@Override
public void onCompleted() {
// Completed
}

@Override
public void onError(Throwable e) {
mView.onDefaultMessage(e.getMessage());
}

@Override
public void onNext(Page page) {
mView.onDefaultMessage(page.getContent());
}
});

mCompositeSubscription.add(subscription);


When i run code i get this exception: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.

I tried official solutions in Realm Github repo but none of them worked. Still get this exception.

I think i get this exception because i'm subscribing on an io thread. Realm instance gets created in Main thread. So i get this exception.

Are there any implementation offers?

Thanks.

Answer

LAST EDIT: technically, both solutions work, it's a question of as Viraj Tank said - "safe integration" vs "deep integration".

Still, the proper deep integration method would be to have separate download from the service, and a subscriber that listens to changes in the underlying Realm. (realmResults.asObservable().subscribe()).



I honestly can't help but feel like this is conceptually flawed.

First thing first, the Realm query is executed on the main thread at creation

@Override
public Observable<Page> search(@NonNull final String query) {

        return Realm.getDefaultInstance().where(Page.class)

Creating an instance of Realm that will never be closed.

Additionally, it uses asObservable() in conjunction with first(), which makes me wonder why you add change listeners to your results in the first place through asObservable() rather than just calling Observable.just(results).

Then, it seems like the remote data source obtains and adds the element to Realm and shows the downloaded item immediately, rather than the elements be supplied directly managed by Realm via change listener and thus providing auto-updates. In which case I'm not really sure what Realm is doing.

Anyways, my initial guess would be that you might be able to make your code work with the following lines

final Observable<Page> localResult = mSearchLocalDataSource.search(query)
                                                           .subscribeOn(AndroidSchedulers.mainThread());
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
            .subscribeOn(Schedulers.io())
            .doOnNext(new Action1<Page>() {
                @Override
                public void call(Page page) {
                    //mSearchLocalDataSource.save(query, page);
                    //mResultCache.put(query, page);
                }
            });

Considering you don't seem to be relying on Realm's auto-updating feature, you could consider using realm.copyFromRealm(obj) to create an unmanaged copy which can be passed between threads.


But in reality, for proper Realm usage, you should have two subscriptions - a single data flow from the network to the Realm; and a subscription for RealmResults<Page>.asObservable() which would notify you when the pages are written to underneath by the network observable - check out Christian Melchior's post for the idea.

Personally, I skipped the Realm observable because the RealmRecyclerViewAdapter handled it. So if you're showing multiple elements in a RecyclerView, then the Realm observable is not even needed, because the RealmRecylerViewAdapter manages its auto-updating through RealmChangeListener without relying on asObservable() to do it.



EDIT:

After forking the asker's repository as https://github.com/Zhuinden/wikilight , apparently I was right all along. The simple zero-copy solution would have been to add subscribeOn(AndroidSchedulers.mainThread()) for the local observable.

So surprisingly enough, not much changed.

    final Observable<Page> localResult = mSearchLocalDataSource.search(query).filter(new Func1<Page, Boolean>() {
        @Override
        public Boolean call(Page page) {
            return page != null;
        }
    }).subscribeOn(AndroidSchedulers.mainThread());
    final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query).subscribeOn(Schedulers.io())
            .doOnNext(new Action1<Page>() {
                @Override
                public void call(Page page) {
                    if (page != null) {
                        mSearchLocalDataSource.save(query, page);
                      //  mResultCache.put(query, page);
                    }
                }
            });


    return Observable.concat(localResult, remoteResult)
            .first()
            .map(new Func1<Page, Page>() {
                @Override
                public Page call(Page page) {
                    if (page == null) {
                        throw new NoSuchElementException("No result found!");
                    }
                    return page;
                }
            });

But to be fair, the original solution seems to cut auto-updating out of the picture, so no RealmChangeListeners are used in the solution, and neither is RealmObject.asObservable(), so copyFromRealm() does make more sense. To make auto-updating work, this

@Override
public Observable<Page> search(@NonNull final String query) {
    return Observable.create(new Observable.OnSubscribe<Page>() {
        @Override
        public void call(Subscriber<? super Page> subscriber) {
            Realm realm = null;
            try {
                realm = mRealmManager.getRealm();

                final Page page = realm.where(Page.class).equalTo("query", query).findFirst();
                if(page != null && page.isLoaded() && page.isValid()) {
                    Log.i("data from", "realm");
                    subscriber.onNext(page);
                } else {
                    Log.i("data is", "empty");
                    Observable.empty();
                }
                subscriber.onCompleted();
            } finally {
                if(realm != null) {
                    mRealmManager.closeRealm(realm);
                }
            }
        }
    });
}

Should be replaced with this:

@Override
public Observable<Page> search(@NonNull final String query) {
    Realm realm = mRealmManager.getRealm(); // UI thread only!
    final Page page = realm.where(Page.class).equalTo("query", query).findFirst();
    if(page != null) {
        Log.i("data from", "realm");
        return page.asObservable();
    } else {
        Log.i("data is", "empty");
        return Observable.empty();
    }
}

In the end, some additional architecturing could make this even better, but I think I'll just go sleep.