Cocowalla Cocowalla -4 years ago 87
C# Question

RX Subject, group/correlate until a condition is met

I have a stream of items, and I want to create a subscription that groups by a property until a condition is met.

For example, let's say I want to group

EventItem
s by Name, until a specific
Id
is observed. The point of this is to correlate events until we see a specific
Id
that signals that there will be no more events to correlate. Then I want to perform some action with each group of correlated events.

public class EventItem
{
public int Id { get; set }
public string Name { get; set; }
}

// Using a Subject since it seems the simplest way
Subject<EventItem> eventStream;
...

// A seperate thread pushes EventItem objects into the Subject
eventStream.OnNext(eventItem);

// Struggling here...
IDisposable subscription = eventStream.????


I've tried several combinations with
GroupBy
,
GroupByUntil
,
TakeUntil
,
TakeWhile
etc, but I can't figure out how to do this (I'm rather inexperienced with Rx)

Answer Source

Does this work?

        Subject<Notification> producer = new Subject<Notification>();

        //This way there's only one producer feeding the group and the duration
        var connectedProducer =
            producer
                .Publish()
                .RefCount();

        connectedProducer
            .GroupByUntil(
                item => item.Name,
                item => connectedProducer.Where(x=> x.Id == 3))
            .SelectMany(result =>
            {
                return result.Aggregate<Notification, List<Notification>>(new List<Notification>(), (dict, item) =>
                {
                    dict.Add(item);
                    return dict;
                });
            })
            //not sure if you need this but just a way to filter out the signal
            .Where(item => item.First().Id != 3) 
            .Subscribe(results =>
            {
                //This will only run after a "3" is passed in and then you get a list of everything passed in with the names
                //not sure if you wanted intermediate results or for it all to just come through once the signal indicates processing
            });

I based this off of a previous answer of mine and just modified it slightly. Rx grouped throttling

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download