Shvalb Shvalb - 6 months ago 18
Java Question

Emitting Array of objects with delay between each object

I have an array of object and I would like to emit each object and then 5 seconds delay. at the end only after all objects were emitted then complete.

Here is my code which doesn't quite do that,

public class SequentialLoopWithDelayTest {

@Test
public void test() {
System.out.println("Start: " + DateTime.now().toString());
rx.Observable.from(new String[] {"Test_1", "Test_2", "Test_3"})
.flatMap(str -> {
return printObservable(str)
.delay(5, TimeUnit.SECONDS);
})
.subscribe(results -> {
System.out.println("End: " + DateTime.now().toString());
});

}

private static rx.Observable<String> printObservable(String str) {
System.out.println(DateTime.now().toString() + ", " + str);
return rx.Observable.just(str);
}
}


How can I fix this?

Notice: I can't use any blocking!

Thanks :-)

Answer

Zip with an observable created using interval

List<String> testList = Arrays.asList(new String[] {"Test_1", "Test_2", "Test_3"});
Observable<String> test = Observable.from(testList).zipWith(Observable.interval(0,5000,TimeUnit.MILLISECONDS), (a,b) -> a);
test.subscribe(value -> System.out.println(value + " Emitted at : " + DateTime.now().toString()), error->{},()-> System.out.print("Completed"));

Full code

import rx.Observable;
import rx.Subscriber;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.util.List;
import org.joda.time.DateTime;

class TimerTest {
    public static void main(String[] args) {
        List<String> testList = Arrays.asList(new String[] {"Test_1", "Test_2", "Test_3"});
        Observable<String> test = Observable.from(testList).zipWith(Observable.interval(0,5000,TimeUnit.MILLISECONDS), (a,b) -> a);
        test.subscribe(value -> System.out.println(value + " Emitted at : " + DateTime.now().toString()), error->{},()-> System.out.print("Completed"));

        try {
            // Sleep so the program doesn't exit immediately
            Thread.sleep(50000);
        }
        catch (Exception e) {

        }
    }
}

To delay the completion

Observable<String> test2 = test.concatWith(Observable.<String>empty().delay(2000,TimeUnit.MILLISECONDS))