mooglinux mooglinux - 3 months ago 13
C# Question

Observing incoming websocket messages with Reactive Extensions?

I want to use linq to process events received via a websocket connection. This is what I have so far:

private static void Main()
{
var rx = new ObservableCollection<JObject>();
using (WebSocket ws = new WebSocket(WsEndpoint))
{
ws.OnMessage += Ws_OnMessage;

ws.Connect();
Console.ReadKey();
ws.Close();
}
}

private static void Ws_OnMessage(object sender, MessageEventArgs e)
{
Console.WriteLine(e.Data);
}


The first think that stumps me is how to turn
ws.OnMessage
into some sort of event stream. I cannot find any examples online for observing an external event source with reactive extensions. I intend to parse the messages into json objects, then filter and aggregate them.

Could someone provide an example of creating an observable from the websocket messages, and subscribing to it?

Answer

If you use the right namespace:

using System.Reactive.Linq;

...you should have an Observable.FromEvent method. This assumes you're using websocket-sharp:

var eventStream = Observable.FromEvent<EventHandler<MessageEventArgs>, MessageEventArgs>(
    handler => ws.OnMessage += handler,
    handler => ws.OnMessage -= handler);

...and now you can subscribe to it as any other IObservable<MessageEventArgs>.

Comments