Twois Twois - 4 years ago 90
TypeScript Question

Prevent pyramid with chaining multiple observable only when if it's complete

I use rxjs and my question is, is there any way if i want to call next function only if the observable completed? So instead of this:

this.Start(data).subscribe(
(data) => {
console.log('next');
},
(err) => {
console.log('err');
},
() => {
console.log('complete');

this.nextFunction() // this return with other observable
.subscribe(
(data) => {
console.log('next');
},
(err) => {
console.log('err');
},
() => {
console.log('complete');
this.nextFunction2(); // this returns with other observable
.subscribe(
(data) => {
console.log('next');
},
(err) => {
console.log('err');
},
() => {
console.log('complete');
this.nextFunction3()
.subscribe(...)
}
);
}
);
}
);


something like this:

this.Start(data).subscribe(
(data) => {
console.log('next');
},
(err) => {
console.log('err');
},
() => {
console.log('complete');
return other observable;
}
)
.then(
(data) => {
console.log('next');
},
(err) => {
console.log('err');
},
() => {
console.log('complete');
return other observable;
}
)
.then(
(data) => {
console.log('next');
},
(err) => {
console.log('err');
},
() => {
console.log('complete');
return other observable;
}
);


So i don't want to design my code like a pyramid.

Answer Source

You can use the static concat operator to compose an observable that subscribes to the observables one after the other:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/concat';
import 'rxjs/add/observable/defer';

Observable.concat(
  this.Start(data),
  Observable.defer(() => this.nextFunction()),
  Observable.defer(() => this.nextFunction2()),
  Observable.defer(() => this.nextFunction3()),
)
.subscribe();

Note that the defer operator is used to defer the calls that obtain subquent observables.

With some logging composed in:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/concat';
import 'rxjs/add/observable/defer';
import 'rxjs/add/operator/do';

Observable.concat(
  this.Start(data).do(
    (value) => { console.log("next: Start:", value); },
    (error) => { console.log("error: Start:", error); },
    () => { console.log("completed: Start"); }
  ),
  Observable.defer(() => this.nextFunction().do(
    (value) => { console.log("next: nextFunction:", value); },
    (error) => { console.log("error: nextFunction:", error); },
    () => { console.log("completed: nextFunction"); }
  )),
  Observable.defer(() => this.nextFunction2().do(
    (value) => { console.log("next: nextFunction2:", value); },
    (error) => { console.log("error: nextFunction2:", error); },
    () => { console.log("completed: nextFunction2"); }
  )),
  Observable.defer(() => this.nextFunction3().do(
    (value) => { console.log("next: nextFunction3:", value); },
    (error) => { console.log("error: nextFunction3:", error); },
    () => { console.log("completed: nextFunction3"); }
  )),
)
.subscribe();
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download