Alexey Zakharov Alexey Zakharov - 1 year ago 317
C# Question

How do I implement polling using Observables?

I have a parametrized rest call that should be executed every five seconds with different params:

Observable<TResult> restCall = api.method1(param1);

I need to create an
which will poll the restCall every 5 seconds with different values for param1. If the api call fails I need to get an error and make the next call in 5 seconds. The interval between calls should be measured only when restCall is finished (success/error).

I'm currently using RxJava, but a .NET example would also be good.

Answer Source


First, an admission, I'm a .NET guy, and I know this approach uses some idioms that have no direct equivalent in Java. But I'm taking you at your word and proceeding on the basis that this is a great question that .NET guys will enjoy, and that hopefully it will lead you down the right path in rx-java, which I have never looked at. This is quite a long answer, but it's mostly explanation - the solution code itself is pretty short!

Use of Either

We will need sort some tools out first to help with this solution. The first is the use of the Either<TLeft, TRight> type. This is important, because you have two possible outcomes of each call either a good result, or an error. But we need to wrap these in a single type - we can't use OnError to send errors back since this would terminate the result stream. Either looks a bit like a Tuple and makes it easier to deal with this situation. The Rxx library has a very full and good implementation of Either, but here is a simple generic example of usage followed by a simple implementation good for our purposes:

var goodResult = Either.Right<Exception,int>(1);
var exception = Either.Left<Exception,int>(new Exception());

/* base class for LeftValue and RightValue types */
public abstract class Either<TLeft, TRight>
    public abstract bool IsLeft { get; }
    public bool IsRight { get { return !IsLeft; } }
    public abstract TLeft Left { get; }
    public abstract TRight Right { get;  }    

public static class Either
    public sealed class LeftValue<TLeft, TRight> : Either<TLeft, TRight>
        TLeft _leftValue;

        public LeftValue(TLeft leftValue)
            _leftValue = leftValue;

        public override TLeft Left { get { return _leftValue; } }
        public override TRight Right { get { return default(TRight); } }
        public override bool IsLeft { get { return true; } }

    public sealed class RightValue<TLeft, TRight> : Either<TLeft, TRight>
        TRight _rightValue;

        public RightValue(TRight rightValue)
            _rightValue = rightValue;

        public override TLeft Left { get { return default(TLeft); } }
        public override TRight Right { get { return _rightValue; } }
        public override bool IsLeft { get { return false; } }

    // Factory functions to create left or right-valued Either instances
    public static Either<TLeft, TRight> Left<TLeft, TRight>(TLeft leftValue)
        return new LeftValue<TLeft, TRight>(leftValue);

    public static Either<TLeft, TRight> Right<TLeft, TRight>(TRight rightValue)
        return new RightValue<TLeft, TRight>(rightValue);

Note that by convention when using Either to model a success or failure, the Right side is used for the successful value, because it's "Right" of course :)

Some Helper Functions

I'm going to simulate two aspects of your problem with some helper functions. First, here is a factory to generate parameters - each time it is called it will return the next integer in the sequence of integers starting with 1:

// An infinite supply of parameters
private static int count = 0;
public int ParameterFactory()
    return ++count; 

Next, here is a function that simulates your Rest call as an IObservable. This function accepts an integer and:

  • If the integer is even it returns an Observable that immediately sends an OnError.
  • If the integer is odd it returns a string concatenating the integer with "-ret", but only after a second has passed. We will use this to check the polling interval is behaving as you requested - as a pause between completed invocations however long they take, rather than a regular interval.

Here it is:

// A asynchronous function representing the REST call
public IObservable<string> SomeRestCall(int x)
    return x % 2 == 0
        ? Observable.Throw<string>(new Exception())
        : Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1));   

Now The Good Bit

Below is a reasonably generic reusable function I have called Poll. It accepts an asynchronous function that will be polled, a parameter factory for that function, the desired rest (no pun intended!) interval, and finally an IScheduler to use.

The simplest approach I could come up with is to use Observable.Create that uses a scheduler to drive the result stream. ScheduleAsync is a way of Scheduling that uses the .NET async/await form. This is a .NET idiom that allows you to write asynchronous code in an imperative fashion. The async keyword introduces an asynchronous function that can then await one or more asynchronous calls in it's body and will continue on only when the call completes. I wrote a long explanation of this style of scheduling in this question, which includes the older recursive the style that might be easier to implement in an rx-java approach. The code looks like this:

public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
    Func<TArg, IObservable<TResult>> asyncFunction,
    Func<TArg> parameterFactory,
    TimeSpan interval,
    IScheduler scheduler)
    return Observable.Create<Either<Exception, TResult>>(observer =>
        return scheduler.ScheduleAsync(async (ctrl, ct) => {
                    var result = await asyncFunction(parameterFactory());
                catch(Exception ex)
                    observer.OnNext(Either.Left<Exception, TResult>(ex));
                await ctrl.Sleep(interval, ct);

Breaking this down, Observable.Create in general is a factory for creating IObservables that gives you a great deal of control over how results are posted to observers. It's often overlooked in favour of unnecessarily complex composition of primitives.

In this case, we are using it to create a stream of Either<TResult, Exception> so that we can return the successful and failed polling results.

The Create function accepts an observer that represents the Subscriber to which we pass results to via OnNext/OnError/OnCompleted. We need to return an IDisposable within the Create call - in .NET this is a handle by which the Subscriber can cancel their subscription. It's particularly important here because Polling will otherwise go on forever - or at least it won't ever OnComplete.

The result of ScheduleAsync (or plain Schedule) is such a handle. When disposed, it will cancel any pending event we Scheduled - thereby ending the the polling loop. In our case, the Sleep we use to manage the interval is the cancellable operation, although the Poll function could easily be modified to accept a cancellable asyncFunction that accepts a CancellationToken as well.

The ScheduleAsync method accepts a function that will be called to schedule events. It is passed two arguments, the first ctrl is the scheduler itself. The second ct is a CancellationToken we can use to see if cancellation has been requested (by the Subscriber disposing their subscription handle).

The polling itself is performed via an infinite while loop that terminates only if the CancellationToken indicates cancellation has been requested.

In the loop, we can use the magic of async/await to asynchronously invoke the polling function yet still wrap it in an exception handler. This is so awesome! Assuming no error, we send the result as the right value of an Either to the observer via OnNext. If there was an exception, we send that as the left value of an Either to the observer. Finally, we use the Sleep function on the scheduler to schedule a wake-up call after the rest interval - not to be confused with a Thread.Sleep call, this one typically doesn't block any threads. Note that Sleep accepts the CancellationToken enabling that to be aborted as well!

I think you'll agree this is a pretty cool use of async/await to simplify what would have been an awfully tricky problem!

Example Usage

Finally, here is some test code that calls Poll, along with sample output - for LINQPad fans all the code together in this answer will run in LINQPad with Rx 2.1 assemblies referenced:

void Main()
    var subscription = Poll(SomeRestCall,
        .Subscribe(x => {
            Console.Write("Interval: " + x.Interval);
            var result = x.Value;
                Console.WriteLine(" Success: " + result.Right);
                Console.WriteLine(" Error: " + result.Left.Message);


Interval: 00:00:01.0027668 Success: 1-ret
Interval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0009684 Success: 3-ret
Interval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0113053 Success: 5-ret
Interval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown.

Note the interval between results is either 5 seconds (the polling interval) if an error was immediately returned, or 6 seconds (the polling interval plus the simulated REST call duration) for a successful result.

EDIT - Here is an alternative implementation that doesn't use ScheduleAsync, but uses old style recursive scheduling and no async/await syntax. As you can see, it's a lot messier - but it does also support cancelling the asyncFunction observable.

    public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
        Func<TArg, IObservable<TResult>> asyncFunction,
        Func<TArg> parameterFactory,
        TimeSpan interval,
        IScheduler scheduler)
        return Observable.Create<Either<Exception, TResult>>(
            observer =>
                    var disposable = new CompositeDisposable();
                    var funcDisposable = new SerialDisposable();
                    bool cancelRequested = false;
                    disposable.Add(Disposable.Create(() => { cancelRequested = true; }));
                    disposable.Add(scheduler.Schedule(interval, self =>
                            funcDisposable.Disposable = asyncFunction(parameterFactory())
                                .Finally(() =>
                                        if (!cancelRequested) self(interval);
                                    res => observer.OnNext(Either.Right<Exception, TResult>(res)),
                                    ex => observer.OnNext(Either.Left<Exception, TResult>(ex)));

                    return disposable;


See my other answer for a different approach that avoids .NET 4.5 async/await features and doesn't use Schedule calls.

I do hope that is some help to the rx-java guys!

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download