NoPyGod NoPyGod - 18 days ago 5
C# Question

What is the proper way to create an Observable which reads a stream to the end

I'm struggling here. Normally I'd read a book but there aren't any yet. I've found countless examples of various things to do with reading streams using RX but I'm finding it very hard to get my head around.

I know I can use Observable.FromAsyncPattern to create a wrapper of the Stream's BeginRead/EndRead or BeginReadLine/EndReadLine methods.

But this only reads once -- when the first observer subscribes.

I want an Observable which will keep reading and pumping OnNext until the stream errors or ends.

In addition to this, I'd also like to know how I can then share that observable with multiple subscribers so they all get the items.

Answer

The solution is to use Observable.Create

Here is an example which can be adapated for reading any kind of stream

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    //this part here can be changed to something like this
                    //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive);

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }

Don't forget to call Connect() on the returned IConnectableObservable

Comments