Max Alcala Max Alcala - 1 month ago 10
Javascript Question

Rxjs - Consume API output and re-query when cache is empty

I'm trying to implement a version of this intro to RxJS (fiddle here) that instead of picking a random object from a returned API array, it consumes a backthrottled stream of objects from the returned API array.

Here's a portion of the code that produces a controlled

Observable
from the API response (full fiddle here):

var responseStream = requestStream.flatMap(function (requestUrl) {
return Rx.Observable.fromPromise(fetch(requestUrl));
}).flatMap(function(response) {
return Rx.Observable.fromPromise(response.json());
}).flatMap(function(json) {
return Rx.Observable.from(json);
}).controlled();


I just dump each emitted user in
console.log
, and use a
click
event stream to trigger the
request()
call in the controlled Observable:

responseStream.subscribe(function(user) {
console.log(user);
});

refreshClickStream.subscribe(function (res) {
responseStream.request(1);
});


There's about 50 user objects returned from the GitHub API, and I'd like to backthrottle-consume them one per click (as seen above). However, after I'm fresh out of user objects I'd like to send in another call to
requestStream
to fetch another API call, replenish the
responseStream
and continue providing user objects to
console.log
upon each click. What would be the RxJS-friendly way to do so?

Answer

I'd do it similarly to the article example with combineLatest() although I wonder if there's an easier way than mine.

I'm making request for only 3 items. Working with 3 items is hardcoded so you'll want to modify this. I was thinking about making it universal but that would require using Subject and made it much more complicated so I stayed with this simple example.

Also, I'm using concatMap() to trigger fetching more data. However, just clicking the link triggers just the combineLatest() which emits another item from the array.

See live demo: https://jsfiddle.net/h3bwwjaz/11/

var refreshButton = document.querySelector('#main');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click')
  .startWith(0)
  .scan(function(acc, val, index) {
    return index;
  });

var usersStream = refreshClickStream
  .map(function(index, length) {
    return index % 3;
  })
  .filter(function(reminder) {
    return reminder === 0;
  })
  .concatMap(function() {
    var randomOffset = Math.floor(Math.random() * 500);
    var url = 'https://api.github.com/users?since=' + randomOffset + '&per_page=3';
    return Rx.Observable.fromPromise(fetch(url))
      .flatMap(function(response) {
        return Rx.Observable.fromPromise(response.json());
      });
  })
  .combineLatest(refreshClickStream, function(responseArray, index) {
    return responseArray[index % 3];
  })
  .distinct();

usersStream.subscribe(function(user) {
  console.log(user);
});

I use refreshClickStream twice:

  • to emit next item in the array in combineLatest()
  • to check whether this is the end of the array and we need to make another request (that's the map() followed by filter() operator).

At the end distinct() is required because when you click index % 3 === 0 time triggers in fact two emission. First is the one from downloading the data and the second one is directly in combineLatest() that we want to ignore because we don't want to iterate the same data again. Thanks to distinct() it's ignored and only the new values is passed.

I was trying to figure out a method without using distinct() but I couldn't find any.