Leonardo Ferrari Leonardo Ferrari - 4 months ago 175
Android Question

Threading in Realm - Clean Code Architecture

I'm using the Clean Code Architecture on Android and Dagger with Realm, but I can't find a way to make it work together.
Bottom line is, I'm always getting:

java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.


FetchItemsRepo:

@Override
public Observable<List<Item>> fetchAllItems() {
final List<Item> items = new ArrayList<>();
final Realm realm = Realm.getInstance(new RealmConfiguration.Builder(context).build());

final RealmQuery<Checkboxes> queryCheck = realm.where(Checkboxes.class);
final RealmQuery<ImageItem> queryImage = realm.where(ImageItem.class);

return queryCheck.findAll()
.asObservable()
.flatMap(new Func1<RealmResults<Checkboxes>, Observable<RealmResults<ImageItem>>>() {
@Override
public Observable<RealmResults<ImageItem>> call(RealmResults<Checkboxes> checkboxes) {
for (Checkboxes checks : checkboxes)
items.add(checks);

return queryImage.findAll().asObservable();
}
})
.flatMap(new Func1<RealmResults<ImageItem>, Observable<List<Item>>>() {
@Override
public Observable<List<Item>> call(RealmResults<ImageItem> imageItems) {
for (ImageItem images : imageItems)
items.add(images);

return Observable.create(new Observable.OnSubscribe<List<Item>>() {
@Override
public void call(Subscriber<? super List<Item>> subscriber) {
subscriber.onNext(items);
}
});
}
});
}


GetItemsUseCase:

public class GetItemsUseCase extends UseCase {

private final ItemsRepository itemsRepository;

@Inject
public GetItemsUseCase(ItemsRepository itemsRepository, ThreadExecutor threadExecutor, PostExecutionThread postExecutionThread) {
super(threadExecutor, postExecutionThread);
this.itemsRepository = itemsRepository;
}

@Override
protected Observable buildUseCaseObservable() {
return this.itemsRepository.fetchAllItems();
}
}


And the UseCase:

public void execute(Subscriber UseCaseSubscriber) {
this.subscription = this.buildUseCaseObservable()
.subscribeOn(Schedulers.from(threadExecutor))
.observeOn(postExecutionThread.getScheduler())
.subscribe(UseCaseSubscriber);
}


I'm not sure if there is any way to make Realm works with this pattern.
The entire project is open-sourced on my GitHub: https://github.com/leonardo2204/materialnotes/tree/bug_thread_realm

Edit:

Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
at io.realm.BaseRealm.checkIfValid(BaseRealm.java:456)
at io.realm.RealmResults.addChangeListener(RealmResults.java:926)
at io.realm.rx.RealmObservableFactory$5.call(RealmObservableFactory.java:147)
at io.realm.rx.RealmObservableFactory$5.call(RealmObservableFactory.java:131)
at rx.Observable.unsafeSubscribe(Observable.java:9860)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:9860)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:9860)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:841)

Answer

You need to make your Realm objects detached in your RealmResults using realm.copyFromRealm().

EDIT: it says your problem is that your Realm instance is not on the same thread as where you're subscribing, which is fair if you are subscribing on Schedulers.from(threadExecutor) but you're initializing this query on the UI thread.

asObservable() won't work on a background thread anyways, only on the UI thread - because it needs to add a change listener, and the change listener needs the main looper to receive updates.

Honestly at this rate you could either do this (and lose auto-updates):

    final List<Item> items = new ArrayList<>();
    Realm realm = null;
    try {
         realm = Realm.getInstance(new RealmConfiguration.Builder(context).build());

        final RealmQuery<Checkboxes> queryCheck = realm.where(Checkboxes.class);
        final RealmQuery<ImageItem> queryImage = realm.where(ImageItem.class);

        RealmResults<Checkboxes> checkBoxes = queryCheck.findAll();
        RealmResults<ImageItem> imageItems = queryImage.findAll();

        for(Checkboxes checkbox : checkBoxes) {
            items.add(realm.copyFromRealm(checkbox));
        }
        for(ImageItem checkbox : checkBoxes) {
            items.add(realm.copyFromRealm(checkbox));
        }
    } finally {
        if(realm != null) {
            realm.close();
        }
    }
    return Observable.just(items);

Or just rethink the way you're doing this entirely, because the optimal solution would be to use findAllAsync() from the UI thread, and if you could obtain both lists in one result list.

EDIT2: It's worth noting that the intended usage of asObservable() is to replace addChangeListener(), but you can only "observe changes" to the Realm on looper threads (typically the UI thread).

So the intended usage of asObservable() is the following

private Subscription readFromEditText() {
    return RxTextView.textChanges(editText).switchMap(charSequence -> {
        String selectedName = charSequence.toString();
        RealmQuery<Dog> query = realm.where(Dog.class);
        if(selectedName != null && !"".equals(selectedName)) {
            query = query.contains(DogFields.NAME, selectedName, Case.INSENSITIVE);
        }
        return query.findAllSortedAsync(DogFields.NAME)
                    .asObservable();
    }).filter(RealmResults::isLoaded) //filter async realm query
      .subscribe(dogs -> adapter.updateData(dogs));
}
Comments