theMothaShip theMothaShip - 3 months ago 41
Javascript Question

RxJs subscription drops after one flatMap due to error

I have the following fiddle (simplified version of some code I have).

https://jsfiddle.net/9uau0ya4/

const subject = new Rx.Subject();

const subscription = subject.flatMap(Rx.Observable.fromPromise).subscribe((value) => {
console.log(value);
});

const promise1 = new Promise((resolve) => {
resolve(1);
});

const promise2 = new Promise((resolve) => {
resolve(2);
});

const promise3 = new Promise((resolve) => {
resolve(3);
});
subject.onNext(promise1);
subject.onNext(promise2);
subject.onNext(promise3);


When I run that fiddle I expect 1, 2, 3 to be logged to the console. What happens is 1 is logged fine, but the next two onNext() calls on the subject show an error
Uncaught ( in promise ) TypeError: self._s.schedule is not a function

I've tried the above with native promises and q.js promises and both seem to show the same. I've looked through the docs but I can't seem to come up with an answer to whats going on here.

Answer

This could be an issue with function binding. If you check this jsfiddle, it works fine. The difference between the two is that in the last the this is clearly bound to Rx.Observable.

I can't seem to find an article which explains this properly but basically if you an object with properties, obj.a.b(x) and b is a function, that is evaluated and this for b is bound to obj.a. But if you do obj.a.b without calling the function, then the evaluation returns a function expression, and I don't remember what the this of that function be.

const subject = new Rx.Subject();
const fromPromise = function (x) {
return Rx.Observable.fromPromise(x);
}

  const subscription = subject.flatMap(fromPromise).subscribe((value) => {
    console.log(value);
  });

  const promise1 = new Promise((resolve) => {
    resolve(1);
  });

  const promise2 = new Promise((resolve) => {
    resolve(2);
  });

  const promise3 = new Promise((resolve) => {
    resolve(3);
  });
  subject.onNext(promise1);
  subject.onNext(promise2);
  subject.onNext(promise3);
Comments