David Aldridge David Aldridge - 3 months ago 50
Javascript Question

Convert infinite async callback sequence to Observable sequence?

Let's say I have the following asynchronous callback-based "infinite" sequence, which I cancel after some time:

'use strict';

const timers = require('timers');

let cancelled = false;

function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}

function cancellableSequence(callback) {
asyncOperation((error, processTime) => {
console.log('Did stuff');
if (!cancelled) {
process.nextTick(() => { cancellableSequence(callback); });
} else {
callback(null, processTime);
}
});
}

cancellableSequence((error, lastProcessTime) => {
console.log('Cancelled');
});

timers.setTimeout(() => { cancelled = true; }, 0);


The
asyncOperation
will execute and call back at least once, and the cancellation message will not display immediately, but rather after
asyncOperation
is complete. The number of calls to
asyncOperation
depends on the internal
delayMsec
value and the delay argument passed to
setTimeout()
at the end (an attempt to show that these are variable).

I'm starting to learn RxJS5, and thought it might be possible to convert this into an Observable sequence ("oooh, an Observable subscription can be unsubscribe()d - that looks neat!").

However, my attempts at turning
cancellableSequence
into an ES6 generator (how else to make infinite?) yielding
Observable.bindNodeCallback(asyncOperation)()
resulted in immediate yields, which in my case is undesired behavior.

I cannot use
Observable.delay()
or
Observable.timer()
, as I do not have a known, consistent interval. (The Math.random(...) in
asyncOperation
was an attempt to indicate that I as the caller do not control the timing, and the callback happens "some unknown time later.")

My failed attempt:

'use strict';

const timers = require('timers');
const Rx = require('rxjs/Rx');

function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);
function* generator() {
while (true) {
console.log('Yielding...');
yield operationAsObservable();
}
}

Rx.Observable.from(generator()).take(2).mergeMap(x => x).subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
)


Which results is the output:

Yielding...
Taking 2698msec to process...
Yielding...
Taking 2240msec to process...
Process took: 2240msec
Process took: 2698msec
Complete


The yields occur right away. The
Process took: xxx
output occurs when you'd expect (after 2240 and 2698ms, respectively).

(In all fairness, the reason I care about the delay in between yields is that
asyncOperation()
here is in reality a rate-limiting token bucket library which controls the rate of asynchronous callbacks - an implementation which I'd like to retain.)

As an aside, I attempted to replace
take(2)
with a delayed cancellation, but that never occurred:

const subscription = Rx.Observable.from(generator()).mergeMap(x => x).subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
)

console.log('Never gets here?');
timers.setTimeout(() => {
console.log('Cancelling...');
subscription.unsubscribe();
}, 0);


Can what I'm attempting be accomplished with a cancellable subscription via RxJS? (I can see other approaches, such as
process.exec('node', ...)
to run
asyncOperation()
as a separate process, giving me the ability to
process.kill(..)
, etc., but let's not go there...).

Is my initial callback-based implementation the suggested way to implement a cancellable sequence?

UPDATED SOLUTION:

See my reply comment to @user3743222's answer below. Here's what I ended up with (replace ES6 generator with
Observable.expand()
):

'use strict';

const timers = require('timers');
const Rx = require('rxjs/Rx');

function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);

const subscription = Rx.Observable
.defer(operationAsObservable)
.expand(x => operationAsObservable())
.subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
);

subscription.add(() => {
console.log('Cancelled');
});

timers.setTimeout(() => {
console.log('Cancelling...');
subscription.unsubscribe();
}, 0);

Answer

You seem to be repeating an action every time it finishes. That looks like a good use case for expand or repeatWhen.

Typically, that would be something like :

Rx.Observable.just(false).expand(_ => {  
  return cancelled ? Rx.Observable.empty() : Rx.Observable.fromCallback(asyncAction)
})

You put cancelled to true at any point of time and when the current action finishes, it stops the loop. Haven't tested it so I would be interested to know if that worked in the end.

You can have a look at similar questions about polling:

Documentation:

Documentation links are for Rxjs 4 but there should not be much changes vs v5