Nicholas Kyriakides Nicholas Kyriakides - 3 months ago 16
Javascript Question

Serially processing a queue of messages whose processing is async

How can I process a series of messages that come through to a function, in the order that they come, when the functions to process the messages are operating on the messages asynchronously?

An example:

var processMessage = function(msg) {
switch (msg.action) {
case "doFoo":
this.processFoo(()=> {
// `foo` is done processing
});
break;
case "doBar":
this.processBar(()=> {
// `bar` is done processing
});
break;
case "doBaz":
this.processBaz(()=> {
// `baz` is done processing
});
break;
}
}


Notes




  • I can certainly push the items in an array and then use async eachSeries to process the array of messages

  • However, messages come constantly, thus filling the array with more items to be processed causing the processing to falter



Is there any de-facto/standard solution to this kind of problem?

Answer

Here's a general scheme:

  1. When new message arrives, check a flag to see if you are already in the middle of processing a message. If the flag is set, just add the message to the queue.
  2. If the flag is not set, check the queue and if there is a message in the queue then remove that message from the queue.
  3. When you start processing that message, set a flag that indicates you are now in the middle of processing a message.
  4. Start the async operation that processes the message
  5. When the callback occurs that signals the completion of the async message, clear the flag to indicate you're in the middle of processing and recursively call a function that starts at step 2 again.

You trigger processing of new messages at two points. First, when a new messages arrives and you aren't already processing a message and second when the processing of some other message completes, you check if anything else was added to the queue while you were processing.

You maintain an inProcessing type flag so you don't inadvertently start processing an incoming message when another message is already in the middle of being processed (this forces the serial execution you requested).

You have to rigorously deal with error conditions so the flag never gets stuck and so your queue processing never gets stalled.

In pseudo code (assuming these are methods on a queue object which contains an array as the queue):

addQueue: function(msg) {
    if (!this.inProcess) {
        // not currently processing anything so just process the message
        this.processMessage(msg);
    } else {
        this.queue.push(msg);
    }
},


processMessage: function(msg, completeFn) {
     var self = this;
     // must set this flag before going async
     self.inProcess = true;
     // asynchronously process this message
     someAsyncProcessing(msg, function(err) {
         self.inProcess = false;
         if (completeFn) {
             completeFn(err);
         }
         // see if anything else is in the queue to process
         if (self.queue.length) {
             // pull out oldest message and process it
             var msg = self.queue.shift();
             self.processMessage(msg);
         }
     });
}