xwk xwk - 3 months ago 26
Javascript Question

How to limit the concurrency of flatMap?

folks,

I'm trying to use RxJS to write a script to process several hundreds of log files, each of which is about 1GB. The skeleton of the script looks like

Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
return Rx.Node.fromReadStream(logFilePath)
.filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)


The code works, but notice that the filtering step of all log files will start concurrently. However, from file system IO performance perspective, it is preferable to process one file after another (or at least to limit the concurrency to a few files rather than opening all hundreds of files in the same time). In this regard, how can I implement it in a "functional reactive way"?

I had thought of scheduler but could not figure out how it can help here.

Answer

You can use .merge(maxConcurrent) to limit the concurrency. Because .merge(maxConcurrent) flattens a metaobservable (observable of observables) into an observable, you need to replace the .flatMap with .map so that the output is a metaobservable ("unflat"), then you call .merge(maxConcurrent).

Rx.Observable.from(arrayOfLogFilePath)
.map(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

This code hasn't been tested (since I don't have access to the development environment you have), but this is how to proceed. RxJS doesn't have many operators with concurrency parameters, but you can almost always do what you need with .merge(maxConcurrent).