Abdelrhman Talat Abdelrhman Talat - 1 month ago 20
Java Question

From RxJava 1 to RxJava 2

I'm trying to convert this RxJava1 code to RxJava2

public static Observable<Path> listFolder(Path dir, String glob) {
return Observable.<Path>create(subscriber -> {
try {
DirectoryStream<Path> stream =
Files.newDirectoryStream(dir, glob);

subscriber.add(Subscriptions.create(() -> {
try {
stream.close();
} catch (IOException e) {
e.printStackTrace();
}
}));
Observable.<Path>from(stream).subscribe(subscriber);
} catch (DirectoryIteratorException ex) {
subscriber.onError(ex);
} catch (IOException ioe) {
subscriber.onError(ioe);
}
});
}


The thing is that in Rxjava2 I don't get a subscriber to add a new subscription to it.

Answer

Enjoy rxjava2 conciseness (Flowable is the backpressure supporting class now):

public static Flowable<Path> listFolder(Path dir, String glob) {
    return Flowable.using(
        () -> Files.newDirectoryStream(dir, glob),
        stream -> Flowable.fromIterable(stream),
        stream -> stream.close());
}