Basti Tee Basti Tee - 6 months ago 9
Javascript Question

How to create a background single-thread FIFO job queue in node.js

I am a JS/node beginner and I am trying to get my head around the subject of "concurrency" in Javascript. I think I am fairly comfortable with callbacks by now, but I don't think that this is the way to go in my scenario. Basically I have recurring expensive tasks (

worker
) that I need to process one by one while the
main process
continues its work. Here is a minimal test.

/*
* We have a worker function that does some expensive task, e.g., an
* I/O task or something else.
*/
worker = function ( jobId ) {

// It will take something between 1 and 2 seconds.
var runtime = Math.floor((Math.random() * 1000) + 1000);
console.log("started job #" + jobId + " (" + runtime + " ms)");

// Then the worker will do something for a while ...
setTimeout(
function() {
// .. and at some point it'll be finished.
console.log("finished job #" + jobId);
}, runtime
);

};

/*
* We obviously have a main process that meanwhile does other stuff
* like processed user interactions. In this case we call this until
* some artificial tickets are used.
*/
mainprocess = function (tickets) {

// Simulate some processing time ..
var runtime = Math.floor((Math.random() * 500));
setTimeout(
function() {
console.log("main process #" + tickets + " (" + runtime + " ms)");
if (tickets > 0) {
tickets--;
mainprocess(tickets);
}
}, runtime
);
}

// At some point in the code we create workers and we want to make sure
// they're processed in the *order of creation* and *one after another*
// without blocking the main process ...
for ( var i = 1; i <= 10; i++) {
worker(i);
};

// ... and the some other stuff will happen for a while!
mainprocess(10);

// ..


The code currently outputs something like..

started job #1 (1751 ms)
started job #2 (1417 ms)
...
started job #9 (1050 ms)
started job #10 (1864 ms)
main process #10 (142 ms)
main process #9 (228 ms)
main process #8 (149 ms)
main process #7 (88 ms)
main process #6 (410 ms)
finished job #9
finished job #5
main process #5 (265 ms)
finished job #2
main process #4 (270 ms)
finished job #7
finished job #3
finished job #1
...
main process #1 (486 ms)
main process #0 (365 ms)


I don't really know, how to change the code, so that the main process will continue, while the worker threads are executed in the order of creation (currently only started in correct order) and one after the other (currently all parallel). The desired output would then be..

started job #1 (1384 ms)
main process #10 (268 ms)
main process #9 (260 ms)
main process #8 (216 ms)
main process #7 (93 ms)
main process #6 (160 ms)
main process #5 (269 ms)
main process #4 (44 ms)
finished job #1
started job #2 (1121 ms)
main process #3 (172 ms)
main process #2 (170 ms)
main process #1 (437 ms)
finished job #2
started job #3 (1585 ms)
main process #0 (460 ms)
finished job #3
started job #4 (1225 ms)
finished job #4
started job #5 (1300 ms)
finished job #5


Any help would be appreciated.

Answer

Okay, I've figured it out. The extended and commented code uses javascript's built-in promises, but you could achieve the same thing with Q or Bluebird or any other node-compatible promise library. Notice that the jquery $.Deferred object is not available in the node-environment.

/*
 * We have a worker function  that does some expensive task, e.g., an
 * I/O task or something else.
 */
worker = function ( jobId ) {

    // we need to put it into a new promise object
    return new Promise(function(resolve, reject) {

        // It will take something between 1 and 2 seconds.
        var runtime = Math.floor((Math.random() * 1000) + 1000);
        console.log("started job #" + jobId + " (" + runtime + " ms)");

        // Then the worker will do something for a while ...
        setTimeout(
            function() {
                // .. and at some point it'll be finished.
                console.log("finished job #" + jobId);
                 // .. now we have to resolve the promise!!
                resolve("resolved job #" + jobId);
            }, runtime
        );

    });
};


/*
 * We obviously have a main process that meanwhile does other stuff
 * like processed user interactions. In this case we call this until
 * some artificial tickets are used.
 */
mainprocess = function (tickets) {

    // Simulate some processing time ..
    var runtime = Math.floor((Math.random() * 500));
    setTimeout(
        function() {
            console.log("main process #" + tickets + " (" + runtime + " ms)");
            if (tickets > 0) {
                tickets--;
                mainprocess(tickets);
            }
        }, runtime
    );
}

// create a sequence with a resolved promise
var sequence = Promise.resolve();

// At some point in the code we create workers and we want to make sure
// they're processed in the *order of creation* and *one after another*
// without blocking the main process ...
for ( var i = 1; i <= 10; i++) {
    // create an IIFE so that the current "i" gets its own
    // closure when it will be used later (otherwise all job ids
    // would be "11" on invokation of the worker).
    (function() {
        var jobId = i;
        // add a new promise after the previous promise resolved
        sequence = sequence.then(
            function(result) {
                // handle result later
                return worker(jobId);
                // return just added the next promise to the chain!
            },
            function(err) {
                // handle error later
            }
        );
    })(); // END IIFE
};

// ... and the some other stuff will happen for a while!
mainprocess(10);

// ..

The output is as desired:

started job #1 (1384 ms)
main process #10 (268 ms)
main process #9 (260 ms)
main process #8 (216 ms)
main process #7 (93 ms)
main process #6 (160 ms)
main process #5 (269 ms)
main process #4 (44 ms)
finished job #1
started job #2 (1121 ms)
main process #3 (172 ms)
main process #2 (170 ms)
main process #1 (437 ms)
finished job #2
started job #3 (1585 ms)
main process #0 (460 ms)
finished job #3
started job #4 (1225 ms)
finished job #4
started job #5 (1300 ms)
finished job #5
...

I have to give big kudos to the following articles, that provided the insights: