Wilka Wilka - 1 month ago 15
C# Question

With Rx, how do I ignore all-except-the-latest value when my Subscribe method is running

Using Reactive Extensions, I want to ignore messages coming from my event stream that occur while my

Subscribe
method is running. I.e. it sometimes takes me longer to process a message than the time between message, so I want to drop the messages I don't have time to process.

However, when my
Subscribe
method completes, if any messages did come through I want to process the last one. So I always process the most recent message.

So, if I have some code which does:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);


and if we assume the '100' takes a long time to process. Then I want the '2' to be processed when the '100' completes. The '1' should be ignored because it was superseded by the '2' while the '100' was still being processed.

Here's an example of the result I want using a background task and
Latest()


var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});


However, Latest() is a blocking call and I'd prefer not to have a thread sitting waiting for the next value like this (there will sometimes be very long gaps between messages).

I can also get the result I want by using a
BroadcastBlock
from TPL Dataflow, like this:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});


but this feels like it should be possible directly in Rx. What's the best way to go about doing it?

Answer

Thanks to Lee Campbell (of Intro To Rx fame), I now have a working solution using this extension method:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;
        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool alreadyActive;
            lock (gate)
            {
                alreadyActive = active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (!alreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}