MikeGomez MikeGomez - 5 months ago 77
Node.js Question

RxJs Basing a Filter on Another Observable

I have a list of words with pronunciation data in a text file. What I would like to do is have the user enter a word, and then have the program check to see if I have data on that word in that file. I'd like to do it in RxJs, which I am new to.

The code below is the closest I can get to what I want. Within my main stream I have a filter call that creates a dependent stream, called 'checkstream'. What I don't understand is how to use the results of that dependent stream in the filter method of my main stream. Currently that filter method fails, but I still get the data to the screen by console logging it.

If there is data in my text file for a word, then the checkstream will end up being an observable containing just the data I want to retrieve and show to the user. I want to somehow pipe that data down to the consumer of my main stream, but I don't understand how to do that.

I would appreciate any help you can provide. Even just some intuition would be useful.

var Rx = require('rx');
var RxNode = require('rx-node');
var fs = require('fs');
var split = require('split');

RxNode.fromReadableStream(process.stdin)
.map( (inputData) => {
var inputWord = inputData.toString().trim();
return inputWord;
})
.map( (inputWord) => {

var checkStream = fs.createReadStream('./dict.txt');

RxNode.fromReadableStream(checkStream.pipe(split()))
.map( (lineFromFile) => {
return JSON.parse(lineFromFile);
})
.filter((parsedDataToCheck) => {
return parsedDataToCheck.word.toLowerCase().trim() === inputWord; })
.subscribe((dataThatMatches) => { console.log(dataThatMatches) });

return dataToReturn;
})
.subscribe(function(dataToReturn) {
console.log(dataToReturn);
});

Answer

Maybe something like this:

var Rx = require('rx');
var RxNode = require('rx-node');
var fs = require('fs');
var split = require('split');

RxNode.fromReadableStream(process.stdin).map( 
    inputData => inputData.toString().trim()
).flatMap(inputWord => {
    var checkStream = fs.createReadStream('./dict.txt');
    return RxNode.fromReadableStream(
        checkStream.pipe(split())
    ).map(
        lineFromFile => JSON.parse(lineFromFile)
    ).find(
        parsedDataToCheck => parsedDataToCheck.word.toLowerCase().trim() === inputWord
    );
}).subscribe(dataToReturn => {
    console.log(dataToReturn);
});

Note that it is possible that the input words from stdin get reordered after filtering due to the asynchronous fs reads.

Comments