ste9206 ste9206 - 1 month ago 22
Android Question

Fatal Exception thrown on Scheduler.Worker thread in my rxjava programm

I've made this project:

public class MainActivity
extends ActionBarActivity {

private Realm realm;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

RealmConfiguration defaultConfig = new RealmConfiguration.Builder().deleteRealmIfMigrationNeeded().build();
realm = Realm.getInstance(defaultConfig);
/**
* Set up Android CardView/RecycleView
*/

RecyclerView mRecyclerView = (RecyclerView) findViewById(R.id.recycler_view);
mRecyclerView.setHasFixedSize(true);
mRecyclerView.setLayoutManager(new LinearLayoutManager(this));
final CardAdapter mCardAdapter = new CardAdapter();
mRecyclerView.setAdapter(mCardAdapter);

/**
* START: button set up
*/
final Button bClear = (Button) findViewById(R.id.button_clear);
Button bFetch = (Button) findViewById(R.id.button_fetch);
bClear.setOnClickListener(new View.OnClickListener() {
public void onClick(View v) {
mCardAdapter.clear();
}
});

bFetch.setOnClickListener(new View.OnClickListener() {
public void onClick(View v) {
Retrofit retrofit = new Retrofit.Builder()
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(GithubService.SERVICE_ENDPOINT).build();

GithubService service = retrofit.create(GithubService.class);

service.getAirport()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<List<Airport>>() {
List<Airport> airps = new ArrayList<Airport>();

@Override
public void onCompleted() {
for(final Airport air : airps) {
realm.executeTransactionAsync(new Realm.Transaction() {
@Override
public void execute(Realm mRealm) {
AirportR airport = mRealm.createObject(AirportR.class);
airport.setId(air.getId());
}
}, new Realm.Transaction.OnSuccess() {
@Override
public void onSuccess() {
Log.wtf("ok", "ok");
}
}, new Realm.Transaction.OnError() {
@Override
public void onError(Throwable error) {
Log.e("ok", "non vaaa");
}
});
}
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(List<Airport> airports) {
airps = airports;
}
});
}
});
/**
* END: button set up
*/
}


where I want to save in my offline realm database all airports. When I launch it I get this error:

E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.example.githubdemo.app, PID: 30527
Theme: themes:{}
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5461)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
Caused by: rx.exceptions.OnCompletedFailedException: Task java.util.concurrent.FutureTask@d76273a rejected from io.realm.internal.async.RealmThreadPoolExecutor@c1e8feb[Running, pool size = 9, active threads = 9, queued tasks = 100, completed tasks = 0]
at rx.observers.SafeSubscriber.onCompleted(SafeSubscriber.java:90)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:262)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at android.os.Handler.handleCallback(Handler.java:739) 
at android.os.Handler.dispatchMessage(Handler.java:95) 
at android.os.Looper.loop(Looper.java:148) 
at android.app.ActivityThread.main(ActivityThread.java:5461) 
at java.lang.reflect.Method.invoke(Native Method) 
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) 
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) 
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@d76273a rejected from io.realm.internal.async.RealmThreadPoolExecutor@c1e8feb[Running, pool size = 9, active threads = 9, queued tasks = 100, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2014)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:794)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1340)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:82)
at io.realm.internal.async.RealmThreadPoolExecutor.submitTransaction(RealmThreadPoolExecutor.java:71)
at io.realm.Realm.executeTransactionAsync(Realm.java:1340)
at app.activity.MainActivity$2$1.onCompleted(MainActivity.java:88)
at rx.observers.SafeSubscriber.onCompleted(SafeSubscriber.java:84)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:262) 
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:199) 
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 
at android.os.Handler.handleCallback(Handler.java:739) 
at android.os.Handler.dispatchMessage(Handler.java:95) 
at android.os.Looper.loop(Looper.java:148) 
at android.app.ActivityThread.main(ActivityThread.java:5461) 
at java.lang.reflect.Method.invoke(Native Method) 
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) 
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) 


how could I fix it?

There are also an other way to save airports in realm using rxJava?

Thanks

Answer

Instead of queueing an asynchronous transaction for every single Airport instance you receive from your API - for batch inserts, you should consider using 1 asynchronous transaction instead, and the insert API.

So this:

for(final Airport air : airps) {
    realm.executeTransactionAsync(new Realm.Transaction() {
        @Override
        public void execute(Realm mRealm) {
            AirportR airport = mRealm.createObject(AirportR.class);
            airport.setId(air.getId());
        }
    }, new Realm.Transaction.OnSuccess() {
        @Override
        public void onSuccess() {
            Log.wtf("ok", "ok");
        }
    }, new Realm.Transaction.OnError() {
        @Override
        public void onError(Throwable error) {
            Log.e("ok", "non vaaa");
        }
    });
}

Should be this:

realm.executeTransactionAsync(new Realm.Transaction() {
    @Override
    public void execute(Realm realm) {
        AirportR airport = new AirportR();
        for(final Airport air : airps) {
            airport.setId(air.getId());
            realm.insertOrUpdate(airport);
        }
    }, new Realm.Transaction.OnSuccess() {
        @Override
        public void onSuccess() {
            Log.wtf("ok", "ok");
        }
    }, new Realm.Transaction.OnError() {
        @Override
        public void onError(Throwable error) {
            Log.e("ok", "non vaaa");
        }
    });
}