K.. K.. - 1 year ago 107
Node.js Question

Preserving context in streams

How do I preseve a context while not making it direct accessible in a stream?

For example:

I tried to model request/response with a stream that simply maps a response to a result. To enable a use like this:

.filter(req => req.url == '/')
.map(req => {body: 'Hello, World!'})

.filter(req => req.url == '/etc')
.map(req => {body: 'etc.'})

The problem I encountered was that the
module of Node.js got an interface like this:

server.on('request', (request, response) => ...)

So I would get a
object I could use in an
, but the
object is part of the context of a request, so the data I mapped a request to needs to led to the corresponding

This would create a stream of requests only:

const requests = new Observable(o =>
server.on('request', r => o.next(r))

And I could map the request data to anything I liked.

But when I subscribe to the resulting data, how do I get it back to the right response?

Can I somehow create a "private" stream for request/response, "publicly" map out the response object from it to get a request only stream and then map the results of this stream back to the original one?

Something like this:

const private = new Observable(o =>
server.on('request', (request, response) => o.next({request, response})

const public = private.map(({request}) => request)


public.filter(r => r.url == '/').map(() => {body: 'Hello, World!'})


.((privateData, publicData) =>

Answer Source

If think just subscribing to your private stream with the public stream would achieve what you want.

const private$ = Rx.Observable.of(1,2,3,4,5);
const public$ = Rx.Subject.create({},private$);
public$.subscribe(i => console.log(i));
<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.11/dist/global/Rx.umd.js"></script>

Another try with two streams:

const res = {
  send: (msg) => { console.log(msg); }

const one = { req: { url: '/', data: 'one' }, res };
const two = { req: { url: '/etc', data: 'two' }, res };
const three = { req: { url: '/', data: 'three' }, res };

const private$ = Rx.Observable.of(one,two, three)
  .concatMap(({req, res }) => {
    // public
    return Rx.Observable.of(req)
      .filter(r => r.url === '/')
      .map(r => ({body: `Hello, ${r.data}!`}))
      .map(p => ({ privateData: res, publicData:p }));
  .subscribe(({privateData, publicData}) => {
<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.11/dist/global/Rx.umd.js"></script>

EDIT: Didn't really understood your question. But I leave my initial answer here:

If I understood you correctly you want to create a "reactive" server by using RxJS to handle incoming requests. This should be a good starting point:

import * as Rx from 'rxjs';
import * as http from 'http';

const request$ = new Subject();
const server = http
  .createServer((req, res) => request$.next({ req, res }))

Then you can subscribe/filter/... the requests with RxJS:

  .filter(req => req.url == '/')
  .subscribe(({ res }) => {
    res.writeHead(200, {"Content-Type": "text/html"});
    res.end('Hello, World!');
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download