Dyna Dyna - 27 days ago 12
TypeScript Question

How to subscribe exactly once to an element from AsyncSubject (consumer pattern)

In

rxjs5
, I have an AsyncSubject and want to subscribe to it multiple times, but only ONE subscriber should ever receive the
next()
event. All others (if they are not yet unsubscribed) should immediately get the
complete()
event without
next()
.

Example:

let fired = false;
let as = new AsyncSubject();

const setFired = () => {
if (fired == true) throw new Error("Multiple subscriptions executed");
fired = true;
}

let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);

// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered

setTimeout(() => {
as.next(undefined);
as.complete();
}, 500);

Answer

The simples way is to wrap your AsyncSubject in another object that handles the logic of calling 1 subscriber only. Assuming you want to invoke the 1st subscriber only, the code below should be a good starting point

let as = new AsyncSubject();

const createSingleSubscriberAsyncSubject = as => {
    // define empty array for subscribers
    let subscribers = [];

    const subscribe = callback => {
        if (typeof callback !== 'function') throw new Error('callback provided is not a function');

        subscribers.push(callback);

        // return a function to unsubscribe
        const unsubscribe = () => { subscribers = subscribers.filter(cb => cb !== callback); };
        return unsubscribe;
    };

    // the main subscriber that will listen to the original AS
    const mainSubscriber = (...args) => {
        // you can change this logic to invoke the last subscriber
        if (subscribers[0]) {
            subscribers[0](...args);
        }
    };

    as.subscribe(mainSubscriber);

    return {
        subscribe,
        // expose original AS methods as needed
        next: as.next.bind(as),
        complete: as.complete.bind(as),
    };
};

// use

const singleSub = createSingleSubscriberAsyncSubject(as);

// add 3 subscribers
const unsub1 = singleSub.subscribe(() => console.log('subscriber 1'));
const unsub2 = singleSub.subscribe(() => console.log('subscriber 2'));
const unsub3 = singleSub.subscribe(() => console.log('subscriber 3'));

// remove first subscriber
unsub1();

setTimeout(() => {
    as.next(undefined);
    as.complete();
    // only 'subscriber 2' is printed
}, 500);