DanDan DanDan - 26 days ago 9
TypeScript Question

Dynamically filtering rxjs stream

I'm using RXJS and I'm looking to dynamically filter the data, but I'm having problems:

let numberSource: ReplaySubject<Number> = new ReplaySubject<Number>();
let numberFilter: BehaviorSubject<Number> = new BehaviorSubject<Number>(5);

let filteredData = numberSource.filter(n => n < numberFilter.value);
numberFilter.subscribe(newFilter => {
filteredData = numberSource.filter(n => n < newFilter);
filteredData.subscribe(console.log); // <- I think this is wrong
});

console.log("A");
filteredData.subscribe(console.log);

numberSource.next(1);
numberSource.next(10);
numberSource.next(100);

console.log("B");
numberFilter.next(50);


What I am doing is subscribing to numberSource, that is the data I am interested in displaying. I am also subscribing to numberFilter, because I want any changes to that to replay the subject, but I think I have done that wrong.

I am expecting to see:

A
1
B
1
10


I am seeing:

A
1
1
B
1
10


Can anyone help?

Answer

I think I understand what you're trying to do. You want to stack all value emitted by numberSource to be able to reemit and filter them when numberFilter changes.

The major problem in your implementation is that numberFilter is a BehaviorSubject which emits its default value (5 in this case) every time you subscribe to it which happens right on the line numberFilter.subscribe(newFilter => .... This callback subscribes to filteredData and then again right after console.log("A");. So you haven't even started emitting data to numberSource and you've already subscribed twice. That's why it gives you 1 two times.

Easy solution is to use classic Subject and remember to unsubscribe() the previous subscription to filteredData:

let numberSource: ReplaySubject<Number> = new ReplaySubject<Number>();
let numberFilter: Subject<Number> = new Subject<Number>();

var subscription;
numberFilter.subscribe(newFilter => {
  if (subscription) {
    subscription.unsubscribe();
  }

  subscription = numberSource.filter(n => n < newFilter)
    .subscribe(console.log);
});

numberFilter.next(5);

console.log("A");

numberSource.next(1);
numberSource.next(10);
numberSource.next(100);

console.log("B");
numberFilter.next(50);

See live demo: http://plnkr.co/edit/vOaD8tcWlLRdfzU14Ufw

Now it gives you the output you wanted.