ZorgoZ ZorgoZ - 1 year ago 152
C# Question

Rx: Wait for first item for a period of time

I would like to transform my legacy event-based method to observable based, but I am quite new to Rx, so I am stuck now.

I have an event source, which is an observable by now. At a certain point in time, I have to start a method that ends either by returning the next element in the line or null if it is timed out.

The event-based approach looks like this:

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
ReaderEvent result = null;
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
{
cts.CancelAfter(waitFor);

EventHandler<ReaderEvent> localHandler = (o, e) =>
{
if (e.PlaceId == PlaceId)
{
result = e;
cts.Cancel();
}
};

ReaderEventHandler += localHandler;
try
{
await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
//...
}

ReaderEventHandler -= localHandler;
}

return result;
}


As you can see, the idea is that the delay is cancelled either by the arrival of the event I am waiting for or the token source is cancelled by configuration after that specific amount of time. Quite clean.

Now, the Rx version:

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
ReaderEvent result = null;

var observable = _OnReaderEvent.FirstAsync(r => r.PlaceId == PlaceId);

using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
{
cts.CancelAfter(waitFor);
using (observable.Subscribe(x => {
result = x;
cts.Cancel();
{
try
{
await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) { }
}
}
return result;
}


Not so clean... even worse...
I have tried with Timeout extension too. But as this is a one-shot subscribtion, I still need waiting somehow before I dispose the subscription. The only difference would be that the OnError would cancel the local token, not the built-in mechanism of CancelAfter.

Is there any batter / more concise (more relying on Rx) way to do this?

Thank you!

Answer Source

you could try with:

var values = await _OnReaderEvent.Where(r => r.PlaceId == placeId).Buffer(waitFor, 1).FirstAsync(); 
return values.FirstOrDefault();
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download