Leon Zhou Leon Zhou - 1 month ago 14
C# Question

Getting the latest item in an observable sequence using RX in C#

take the following as an example:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();


What I'm trying to achieve here is to obtain the value of the latest item in the sequence at any given time "synchronously". Which means extensions like
FirstAsync
can't make it up for me.

The
StartWith
and
Replay
bit ensures that there will always be a value, and the
RefCount
bit is necessary in my actual code to detect when I can do some disposal actions.

So to simulate this "any given time" part, let's try getting the latest value after 5 seconds:

Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
// Try to get latest value from "ob" here.
});


So with a 5 second delay, I need to get the value
5
out of the sequence and these are what I have tried so far with no success:


  1. ob.First()
    - returns 500

  2. ob.Latest().Take(1)
    - same as above

  3. ob.MostRecent(-1).First()
    - same as above

  4. ob.MostRecent(-1)
    - gives me an
    IEnumerable<long>
    full of "500"

  5. ob.Last()
    - never returns because it's waiting for the sequence to complete which it never will

  6. ob.Latest().Last()
    - same as above

  7. ob.ToTask().Result
    - same as above

  8. ob.ToEnumerable()
    - same as above

  9. ob.MostRecent().Last()
    same as above



It seems there's not much resources around that people can actually do this. The closest I can find is this: "Rx: operator for getting first and most recent value from an Observable stream", but it is not a synchronous call after all (still using a subscription) so it doesn't work for me.

Does any body know if this is actually doable?

Answer

To point out why your code probably isn't working as you expect it to

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
//Note at this point `ob` has never been subscribed to,
// so the Reference-count is 0 i.e. has not be connected.

Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.

    //Here we make our first subscription to the `ob` sequence.
    //  This will connect the sequence (invoke subscribe)
    //   which will
    //      1) invoke StartWith
    //      2) invoke onNext(500)
    //      3) invoke First()
    //      4) First() will then unsubscribe() as it has the single value it needs
    //      5) The refCount will now return to 0
    //      6) The sequence will be unsubscribed to.
    ob.First().Dump();  

    //Any future calls like `ob.First()` will thus always get the value 500.
});

Potentially what you want is

var ob = Observable.Interval(TimeSpan.FromSeconds(1))
    .Publish(500);
var connection = ob.Connect();
//Note at this point `ob` has never been subscribed to, so the ReferenceCount is 0 i.e. has not be connected.

var subscription = Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.
    ob.First().Dump();
});

//Sometime later
subscription.Dispose();
connection.Dispose()

HOWEVER, You really don't want to be mixing Synchronous calls with Rx. You also generally don't want to be subscribing within a subscription (as .First() is a subscription). What you probably mean to be doing is getting the latest value, and stashing it somewhere. Using .First() is just a slippery slope. You probably would be better writing something like

var subscription = Observable.Timer(TimeSpan.FromSeconds(5))
    .SelectMany(_=>ob.Take(1))
    .Subscribe(x =>
    {
        //Do something with X here.
        x.Dump();
    });