Veksi Veksi - 1 month ago 7
C# Question

Why is not OnCompleted not called in this Suspendable implementation ("Rx Pausable")?

I read the following snipped from Ollie Riches' blog post Trying to be more functional with Rx and became to wonder the same too as the author: why is not OnCompleted passed? Could someone tell what is happening here? Perhaps something embarrassingly simple?

The code is a bit bit modified and reproduced here for convenience (and my apologies to Ollie if it wasn't acceptable to rip his code here):

public static class RxExtensions
{
public static IObservable<T> Suspendable<T>(this IObservable<T> stream, IObservable<bool> suspend, bool initialState = false)
{
return Observable.Create<T>(o =>
{
var disposable = suspend.StartWith(initialState)
.DistinctUntilChanged()
.Select(s => s ? Observable.Empty<T>() : stream)
.Switch()
.Subscribe(o);

return disposable;
});
}
}

var testScheduler = new TestScheduler();
var generatorCount = 10;

//If the limit will be hardcoded to something less than generatorCount, an exception will be
//thrown and the exception object will be set. Why it doesn't happen to completed in the following?
var generator = Observable.Generate(1,
x => x <= generatorCount,
x => x + 1,
x => { if(x != 11) { Console.WriteLine(x); return x; } else { throw new ArgumentException(); } },
x => TimeSpan.FromSeconds(1),
testScheduler);


Exception exception = null;
var completed = false;
generator.Suspendable(new Subject<bool>()).Subscribe(_ => { }, e => exception = e, () => completed = true);
testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1001000).Ticks);

Console.WriteLine(exception);
Console.WriteLine(completed);


For the record, I was thinking to try to produce a stream that can be paused and stopped with the distinction that paused stream accumulates events, suspeneded just skips them. It started to look a bit more involved than I expected to, especially if one thinks to put a limit or "save strategy" to the paused bit. Oh well...

<edit: Interestingly, I just noticed a RxJS implementation of Pausable.

Answer

Your observer is subscribed to both the suspend stream and the source stream. This combined stream will not complete until both streams complete. Basically your source stream completes, but the Suspendable is waiting to see if any more pause/unpause signals will come through. If they do, it will resubscribe to the source stream.

To have the pausable stream complete when the source stream completes is possible, but probably would defeat the purpose of your method. Something would basically have to stay subscribed to the source stream and end the paused stream when source completes. You could do it with something like this:

var shared = stream.Publish();
var pausable = suspend
    .StartWith(initialState)
    .TakeUntil(shared.LastOrDefaultAsync())
    .DistinctUntilChanged()
    .Select(p => p ? shared : Observable.Empty<T>())
    .Switch();
var disposable = new CompositeDisposable(pausable.Subscribe(o), shared.Connect());
return disposable;
Comments