Ignacio Calvo Ignacio Calvo - 1 month ago 14
C# Question

How to have events separated at least by a given time span?

I want the events of the output sequence to happen as soon as possible, but not within a window of N seconds that starts at the latest event.

This is a marble diagram, assuming I want a separation of at least three dashes between events:

Input: a-------b-cd-----e---------f-----g-h
Result: a-------b---c---d---e------f-----g---h


The signature would be:

IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation);

Answer

Thanks for a really interesting question. I took a stab at this - flying off into scheduling future actions - and, while I managed to hit the expected output, there were significant issues with my solution.

Yours is much cleaner but... ummm... wrong. Well, slightly ;0)

I started by writing the following test fixture using Microsoft's TestScheduler:

[Fact]
public void MatchExpected()
{
    TestScheduler scheduler = new TestScheduler();

    // 0        1         2         3         4 
    // 1234567890123456789012345678901234567890
    // a-------b-cd-----e---------f-----ghX     <- Input
    IObservable<char> input = scheduler.CreateColdObservable(
        ReactiveTest.OnNext(1, 'a'),
        ReactiveTest.OnNext(9, 'b'),
        ReactiveTest.OnNext(11, 'c'),
        ReactiveTest.OnNext(12, 'd'),
        ReactiveTest.OnNext(18, 'e'),
        ReactiveTest.OnNext(28, 'f'),
        ReactiveTest.OnNext(34, 'g'),
        ReactiveTest.OnNext(35, 'h'),
        ReactiveTest.OnCompleted<char>(36)
    );

    // 0        1         2         3         4 
    // 1234567890123456789012345678901234567890
    // a-------b-cd-----e---------f-----ghX     <- Input
    // a-------b---c---d---e------f-----g---hX  <- Expected
    var expected = new []
    {
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 1, 'a'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 9, 'b'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 13, 'c'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 17, 'd'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 21, 'e'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 28, 'f'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 34, 'g'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 38, 'h'),
        ReactiveTest.OnCompleted<char>(ReactiveTest.Subscribed + 38)
    };            

    var actual = scheduler.Start(() => input.Separate(TimeSpan.FromTicks(4), scheduler), ReactiveTest.Subscribed + 40);

    Assert.Equal(expected, actual.Messages.ToArray());
}

In this you can see the marble diagram of input and expected output (using your original dash notation). Unfortunately, when using your implementation, you receive the following output:

// 0        1         2         3         4 
// 1234567890123456789012345678901234567890
// a-------b-cd-----e---------f-----ghX     <- Input
// a-------b---c---d---e------f-----g---hX  <- Expected
// -a-------b--c---d---e-------f-----g--hX  <- Actual

You see, the Delay overload that uses an observable to end the delay requires time on the scheduler before the observable can emit a value. Unfortunately, in instances where the value should be emitted immediately (x.delay == TimeSpan.Zero), it is actually being emitted a fraction later due to a loop through the scheduler.

As I had the test fixture and you had the workable solution, I thought I'd post back a corrected version as shown below:

public static IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation, IScheduler scheduler)
{
    return Observable.Create<T>(
        observer =>
        {
            var timedSource = source
                .Timestamp(scheduler)
                .Scan(
                    new
                    {
                        value = default(T),
                        time = DateTimeOffset.MinValue,
                        delay = TimeSpan.Zero
                    },
                    (acc, item) =>
                    {
                        var time =
                            item.Timestamp - acc.time >= separation
                            ? item.Timestamp
                            : acc.time.Add(separation);
                        return new
                        {
                            value = item.Value,
                            time,
                            delay = time - item.Timestamp
                        };
                    })
                .Publish();

            var combinedSource = Observable.Merge(
                timedSource.Where(x => x.delay == TimeSpan.Zero),
                timedSource.Where(x => x.delay > TimeSpan.Zero).Delay(x => Observable.Timer(x.delay, scheduler))
            );

            return new CompositeDisposable(
                combinedSource.Select(x => x.value).Subscribe(observer),
                timedSource.Connect()
            );
        }
    );
}

Which provides the expected output:

// 0        1         2         3         4 
// 1234567890123456789012345678901234567890
// a-------b-cd-----e---------f-----ghX     <- Input
// a-------b---c---d---e------f-----g---hX  <- Expected
// a-------b---c---d---e------f-----g---hX  <- Actual

Note the addition of the IScheduler parameter and it's use through-out the operator code. This is good practice when implementing any operator in Rx that can potentially introduce concurrency (as this one does) and it allows you to write (extremely exacting) tests!

So there you go. Hope it helps :0)

Comments