TimCodes TimCodes - 2 years ago 90
Javascript Question

Observable Race Condition, how to time two Observables correctly

I have an obeservable sequence, where each time a call release event occurs I map that event to an http request that returns a callog json Array.

I need to combine the value emitting form the call event with the json array. However by the time the json array has returned the call event Observable has emitted a new value.

const agentCallEventStream = Rx.Observable.fromEvent(call realease event)
const agentCallLogStream = agentCallEventStream.flatMap( (agentObj) => {
return Rx.Observable.fromPromise(callLogHelper.getUserCallLogs(agentObj.agentId));

// I tried this
const callLogMerged = agentCallLogStream.combineLatest(agentCallEventStream)

// but the event data returned is newer than the call log data returned

Im looking for an operator or some way to keep the data in sync

Answer Source

If you want to merge the same agentObj that generated the promise, you can carry it in the selector function directly. You will find a good example from that SO's question : RxJs avoid external state but still access previous values

agentCallEventStream.flatMap( (agentObj) => 
                    .map(function (promiseValue){return {
                         promise : promiseValue, 
                         agentObj : agentObj

If that's not what you want, you should specify clearly in a marble diagram what are the input and expected outputs.

UPDATE : following paulpadniels comment, here is a shorter version using flatMap with result selector function :

    (agentObj) => Rx.Observable.fromPromise(callLogHelper.getUserCallLogs(agentObj.agentId)),
    (promiseValue) => {promise : promiseValue, agentObj : agentObj}

Truth is this signature of flatMap is rarely used, but it should be when possible as it is more performant (avoids creating extra inner observables and subscriptions).

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download