Steven Schoen Steven Schoen - 1 year ago 76
Java Question

RxJava: Find out if BehaviorSubject was a repeated value or not

I'm making an Android interface that shows some data fetched from the network. I want to have it show the latest available data, and to never be empty (unless no data has been fetched at all yet) so I'm using a BehaviorSubject to give subscribers (my UI) the latest available info, while refreshing it in the background to update it.

This works, but due to another requirement in my UI, I now have to know whether or not the published result was gotten fresh from the network or not. (In other words, I need to know if the published result was BehaviorSubject's saved item or not.)

How can I achieve this? If I need to split it up into multiple Observables, that's fine, as long as I'm able to get the caching behavior of BehaviorSubject (getting the last available result) while also being able to tell if the result returned was from the cache or not. A hacky way I can think of to do it would be to check if the timestamp of the response was relatively soon, but that'd be really sloppy and I'd rather figure out a way to do it with RxJava.

Answer Source

As you mentioned in the question, this can be accomplished with multiple Observables. In essence, you have two Observables: "the fresh response can be observed", and "the cached response can be observed". If something can be "observed", you can express it as an Observable. Let's name the first one original and the second replayed.

See this JSBin (JavaScript but the concepts can be directly translated to Java. There isn't a JavaBin as far as I know, for these purposes).

var original = Rx.Observable.interval(1000)
  .map(function (x) { return {value: x, from: 'original'}; })

var replayed = original
  .map(function (x) { return {value: x.value, from: 'replayed'}; })
  .replay(null, 1).refCount();

var merged = Rx.Observable.merge(original, replayed)
  .replay(null, 1).refCount()
  .distinctUntilChanged(function (obj) { return obj.value; });

console.log('subscribe 1st');
merged.subscribe(function (x) { 
  console.log('subscriber1: value ' + x.value + ', from: ' + x.from);

setTimeout(function () {
  console.log('    subscribe 2nd');
  merged.subscribe(function (x) {
    console.log('    subscriber2: value ' + x.value + ', from: ' + x.from);
}, 2500);

The overall idea here is: annotate the event with a field from indicating its origin. If it's original, it's a fresh response. If it's replayed, it's a cached response. Observable original will only emit from: 'original' and Observable replayed will only emit from: 'replayed'. In Java we would require a bit more boilerplate because you need to make a class to represent these annotated events. Otherwise the same operators in RxJS can be found in RxJava.

The original Observable is publish().refCount() because we want only one instance of this stream, to be shared with all observers. In fact in RxJS and Rx.NET, share() is an alias for publish().refCount().

The replayed Observable is replay(1).refCount() because it is also shared just like the original one is, but replay(1) gives us the caching behavior.

merged Observable contains both original and replayed, and this is what you should expose to all subscribers. Since replayed will immediately emit whenever original does, we use distinctUntilChanged on the event's value to ignore immediate consecutives. The reason we replay(1).refCount() also the merged is because we want the merge of original and replay also to be one single shared instance of a stream shared among all observers. We would have used publish().refCount() for this purpose, but we cannot lose the replay effect that replayed contains, hence it's replay(1).refCount(), not publish().refCount().