Pierre Pierre - 5 months ago 109
Swift Question

Rxswift map + concat in parallel

This Observable is executing the following


  • Given a source observable

  • we use map to perform some async work

  • we use concat to return the result of the async work in order



The following is returning the desired result, but I would like to start
the async work in parallel.

What the correct way of doing it with Rx?

import RxSwift

func delay(time: Int, closure: () -> Void) {
dispatch_after(
dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
dispatch_get_main_queue(), closure)
}

func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
return Observable.create() { (observer) -> Disposable in
print(desc)
delay(time) {
observer.onNext(value)
observer.onCompleted()
}
return NopDisposable.instance
}
}

let seq = Observable
.of(1, 2, 3, 4, 5)
.map { (n) -> Observable<Int> in
return doAsyncWork(n,
desc: "start \(n) - wait \(5 - n)",
time: 6 - n
)
}
.concat()

let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \($0)") }
sharedSeq.subscribeCompleted { print("=> completed") }


This produce

//start 1 - wait 4
// => 1
//start 2 - wait 3
// => 2
//start 3 - wait 2
// => 3
//start 4 - wait 1
// => 4
//start 5 - wait 0
// => 5


The desired output would be

//start 1 - wait 4
//start 2 - wait 3
//start 3 - wait 2
//start 4 - wait 1
//start 5 - wait 0
// => 1
// => 2
// => 3
// => 4
// => 5

Answer

This seems to work not sure this is the best answer though

import RxSwift

func delay(time: Int, closure: () -> Void) {
  dispatch_after(
    dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
    dispatch_get_main_queue(), closure)
}

func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
  return Observable.create() { (observer) -> Disposable in
    print(desc)
    delay(time) {
      observer.onNext(value)
      observer.onCompleted()
    }
    return NopDisposable.instance
  }
}

let seq = Observable
  .of(1, 2, 3, 4, 5)
  .map { (n) -> Observable<Int> in
    let o = doAsyncWork(n,
      desc: "start \(n) - wait \(5 - n)",
      time: 6 - n
    ).shareReplay(1)
    o.subscribe()
    return o.asObservable()
  }
  .concat()

let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \($0)") }
sharedSeq.subscribeCompleted { print("=> completed") }