Cocowalla Cocowalla -4 years ago 87
C# Question

RX Subject, group/correlate until a condition is met

I have a stream of items, and I want to create a subscription that groups by a property until a condition is met.

For example, let's say I want to group

s by Name, until a specific
is observed. The point of this is to correlate events until we see a specific
that signals that there will be no more events to correlate. Then I want to perform some action with each group of correlated events.

public class EventItem
public int Id { get; set }
public string Name { get; set; }

// Using a Subject since it seems the simplest way
Subject<EventItem> eventStream;

// A seperate thread pushes EventItem objects into the Subject

// Struggling here...
IDisposable subscription = eventStream.????

I've tried several combinations with
etc, but I can't figure out how to do this (I'm rather inexperienced with Rx)

Answer Source

Does this work?

        Subject<Notification> producer = new Subject<Notification>();

        //This way there's only one producer feeding the group and the duration
        var connectedProducer =

                item => item.Name,
                item => connectedProducer.Where(x=> x.Id == 3))
            .SelectMany(result =>
                return result.Aggregate<Notification, List<Notification>>(new List<Notification>(), (dict, item) =>
                    return dict;
            //not sure if you need this but just a way to filter out the signal
            .Where(item => item.First().Id != 3) 
            .Subscribe(results =>
                //This will only run after a "3" is passed in and then you get a list of everything passed in with the names
                //not sure if you wanted intermediate results or for it all to just come through once the signal indicates processing

I based this off of a previous answer of mine and just modified it slightly. Rx grouped throttling

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download