BeetleJuice BeetleJuice - 18 days ago 8x
TypeScript Question

RxJS 5 with Angular 2: replay subscription on schedule determined by previous result

In my

Angular 2
typescript 2
app, I query the server for a value that needs to be updated periodically. The delay between updates is variable (the server sends an expiration date along with the value).

I'm having trouble composing an Observable stream that will replay (initiate a new call to the server) automatically once the current value reaches expiration. What I have so far doesn't scale at all:

price = 5; //initial value is known
expires = ...;//initial expiration is known

// server returns {expires:number, price:number}
this.http.get('...').map(res => res.json())

Observable.timer( // when initial price expires
.switchMap(()=>this.getData()) // fetch new price and expiration
.subscribe( data =>
this.price = data.price;
Observable.timer( //redo when price expires
.subscribe(...) //callback hell (endless inner blocks)

There must be a better way to schedule the follow-up calls


My approach was to use switchMap() along with timer() to delay the start of the observable chain, and repeat() to keep seeking fresh data once the previous price expires:

price = 0;    //initial price
_exp  = 0; //initial delay before fetching data

/** Observable will emit after the expiration delay */
wait() {return Observable.timer(this._exp*1000)}

stream$ = Observable.of(null)
    .switchMap(()=>this.wait()) // wait for the expiration delay                
    .switchMap(()=>this.getServerData()) // get fresh data
    .do(e=>{this._exp = e.expires}) //update expiration             
    .repeat() // repeat until the calling code unsubscribes

When I subscribe, the first price is fetched immediately, and the sequence is repeated indefinitely with each cycle delayed by expires seconds. I can update the model with prices as they arrive:

ngOnInit(){$.subscribe( e=>this.price = e.price);

Live demo