crazy_crank crazy_crank - 23 days ago 11
C# Question

Using Rx to merge multiple sources by key

I'm kinda new to the reactive extensions, but since I have a very data-flow heavy problem, I'm assuming, it could massively simplify my implementation. But it seems my problem is a bit more exotic than I anticipated.

Problem



I have multiple data sources, which all emit part of the data for the same entity. eg I have
datasource1
, which emits the first name of a person, and
datasource2
which emits the last name of a person. The arrival of these data is completely unpredictable.

What I need to do now, is to observe both those sources, and to use some kind of operator or subject, which allows me to await both source-observables. I only want to continue if both datasources return their specific part. Both my sources also pass a key for the data, so it's possible to link the together at a later point.

Is there a construct built into reactive, which allows me to that? Or is reactive simply the wrong toolset to solve my problem?

Answer

I can't judge whether Rx or async/await or TPL-Dataflow is a better solution, since that would probably depend on your larger application. Some reproducible code would really help.

Anyhow, here's an Rx solution. I'm assuming for now datasource1 and datasource2 are observables of different types, or easily convertible to observables of different types. If they were observables of the same type, this solution would also work, but you would have other options as well:

var firstNameSource = new Subject<FirstNameMessage>();
var lastNameSource = new Subject<LastNameMessage>();
var timeout = TimeSpan.FromSeconds(1); //Set to length of time willing to wait

var join = firstNameSource.Join(lastNameSource,
        fnm => Observable.Timer(timeout),
        lnm => Observable.Timer(timeout),
        (fnm, lnm) => new { FirstNameMessage = fnm, LastNameMessage = lnm }
    )
    .Where(a => a.FirstNameMessage.Id == a.LastNameMessage.Id)
    .Select(a => Tuple.Create(a.FirstNameMessage.Name, a.LastNameMessage.Name))
    .Timeout(timeout)
    .Catch(Observable.Empty<Tuple<string, string>>());

Using these sample classes:

public class FirstNameMessage
{
    public int Id { get; set; }
    public string Name { get; set; }
}

public class LastNameMessage
{
    public int Id { get; set; }
    public string Name { get; set; }
}

Here's some sample subscription/execution code:

join.Subscribe(t => Console.WriteLine($"{t.Item1} {t.Item2}"), () => Console.WriteLine("No more names!"));

firstNameSource.OnNext(new FirstNameMessage{Id = 1, Name = "John" });
lastNameSource.OnNext(new LastNameMessage{Id = 1, Name = "Smith" });

lastNameSource.OnNext(new LastNameMessage { Id = 2, Name = "Jones" });
await Task.Delay(TimeSpan.FromMilliseconds(500));
firstNameSource.OnNext(new FirstNameMessage { Id = 2, Name = "Paul" });

firstNameSource.OnNext(new FirstNameMessage { Id = 3, Name = "Larry" });
await Task.Delay(TimeSpan.FromMilliseconds(1500));
lastNameSource.OnNext(new LastNameMessage { Id = 3, Name = "Fail" });

firstNameSource.OnNext(new FirstNameMessage { Id = 4, Name = "Won't Work" });
lastNameSource.OnNext(new LastNameMessage { Id = 4, Name = "Subscription terminated" });

Explanation:

The crucial part of this solution is the Join operator. Whereas a standard DB/LINQ Join joins things by key, Rx's Join joins by time window. So the Join above joins any FirstNameMessage and LastNameMessage that are within timeout timespan of each other. Since we also want to join by key, that's why the Where clause is there.

The TimeOut and Catch calls at the end are possibly superfluous: They just serve to terminate the subscription. It sounds like your solution may just be waiting for one value, not multiple, so that may be required.

Comments