Yossarian Yossarian - 3 months ago 19
C# Question

C# Rx - ignoring error

I have stream of independent events, that are handled asynchronously with Reactive extension. The handler may fail for whatever reasons, but the stream continues on.

However, in Rx, right after an error occurs, it automatically unsubscribes. Is this somehow configurable?

Example:

async Task<Unit> ActionAsync(int i)
{
if (i > 1)
throw new Exception();

i.Dump();
return Unit.Default;
}

void Main()
{
var sb = new Subject<int>();

sb.SelectMany(ActionAsync).Subscribe(
_ => { },
ex =>
{
ex.Dump();
}
);


sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}


I'd like to have following output:


  • 1

  • Exception

  • 3



Can I achieve this without try/catching in the
ActionAsync
?

Answer

There is a contract of behaviour in Rx where a stream can only be OnNext*(OnError|OnCompleted). In other words zero or more OnNext and only one of either OnError or OnCompleted at the end.

So, no you can't configure Rx. It would no longer be Rx if you did.

What you can do, however, is write a query that can retry the source.

If you write your code like this:

async Task<int> ActionAsync(int i)
{
    if (i == 2)
        throw new Exception();

    return i;
}

void Main()
{
    var sb = new Subject<int>();

    sb
        .SelectMany(ActionAsync)
        .Do(_ => { }, ex => ex.Dump())
        .Retry()
        .Subscribe(_ => _.Dump());

    sb.OnNext(1);
    sb.OnNext(2);
    sb.OnNext(3);
}

Then you do get:

1
Exception of type 'System.Exception' was thrown. 
3

As per your comment asking about performance issues, there aren't any performance issues in using .Retry(), but there is a behavioural issue.

If the source were cold - like var sb = new [] { 1, 2, 3 }.ToObservable(); - then the .Retry() would start with the entire observable sequence again and result in an infinite sequence of:

1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
...

In your code's case the observable is a hot observable so this doesn't happen.

If you wish to do this on a cold observable you would need to make it hot via .Publish(...). Like this:

var sb = new[] { 1, 2, 3 }.ToObservable();

sb
    .Publish(sbp =>
        sbp
            .SelectMany(ActionAsync)
            .Do(_ => { }, ex => ex.Dump())
            .Retry())
    .Subscribe(_ => _.Dump());

Then the expected behaviour returns.