K.. K.. - 3 months ago 21
Node.js Question

Preserving context in streams

How do I preseve a context while not making it direct accessible in a stream?

For example:

I tried to model request/response with a stream that simply maps a response to a result. To enable a use like this:

requests
.filter(req => req.url == '/')
.map(req => {body: 'Hello, World!'})

requests
.filter(req => req.url == '/etc')
.map(req => {body: 'etc.'})


The problem I encountered was that the
http
module of Node.js got an interface like this:

server.on('request', (request, response) => ...)


So I would get a
request
object I could use in an
Observable
, but the
response
object is part of the context of a request, so the data I mapped a request to needs to led to the corresponding
response
object.

This would create a stream of requests only:

const requests = new Observable(o =>
server.on('request', r => o.next(r))
)


And I could map the request data to anything I liked.

But when I subscribe to the resulting data, how do I get it back to the right response?

Can I somehow create a "private" stream for request/response, "publicly" map out the response object from it to get a request only stream and then map the results of this stream back to the original one?

Something like this:

const private = new Observable(o =>
server.on('request', (request, response) => o.next({request, response})
)

const public = private.map(({request}) => request)

...

public.filter(r => r.url == '/').map(() => {body: 'Hello, World!'})

...

private
.matchUpWith(public)
.((privateData, publicData) =>
privateData.response.end(publicData.body)
)

Answer

If think just subscribing to your private stream with the public stream would achieve what you want.

const private$ = Rx.Observable.of(1,2,3,4,5);
const public$ = Rx.Subject.create({},private$);
    
public$.subscribe(i => console.log(i));
<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.11/dist/global/Rx.umd.js"></script>

Another try with two streams:

const res = {
  send: (msg) => { console.log(msg); }
};

const one = { req: { url: '/', data: 'one' }, res };
const two = { req: { url: '/etc', data: 'two' }, res };
const three = { req: { url: '/', data: 'three' }, res };

const private$ = Rx.Observable.of(one,two, three)
  .concatMap(({req, res }) => {
    // public
    return Rx.Observable.of(req)
      .filter(r => r.url === '/')
      .map(r => ({body: `Hello, ${r.data}!`}))
      .map(p => ({ privateData: res, publicData:p }));
  })
  .subscribe(({privateData, publicData}) => {
    privateData.send(publicData.body);
  });
<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.11/dist/global/Rx.umd.js"></script>


EDIT: Didn't really understood your question. But I leave my initial answer here:

If I understood you correctly you want to create a "reactive" server by using RxJS to handle incoming requests. This should be a good starting point:

import * as Rx from 'rxjs';
import * as http from 'http';

const request$ = new Subject();
const server = http
  .createServer((req, res) => request$.next({ req, res }))
  .listen(3000);

Then you can subscribe/filter/... the requests with RxJS:

request$
  .filter(req => req.url == '/')
  .subscribe(({ res }) => {
    res.writeHead(200, {"Content-Type": "text/html"});
    res.end('Hello, World!');
  });