K.. K.. - 18 days ago 7
Javascript Question

Modeling WebSocket streams with RxJS

What are the ways to model WebSocket streams with RxJS.

The obvious things I see are streams of sockets, that emit streams of messages.

If I create a stream oft sockets, how can I create streams of their messages and still preserve who sent these messages?

The socket stream was my first step:

const socket$ = Observable.create(({complete, next}) => {

const server = new WebSocketServer({server: someHttpServer})

server.on('connection', next)

return () => {
server.close()
complete()
}

})


But the message stream is a bit harder, because I need the sockets I got the messages from.

This was my fist naive try on modeling:

const message$ = socket$.flatMap(socket => Observable.create(({complete, next}) => {

socket.on('message', next)
socket.on('close', complete)

return () => socket.close()

})).share()


A observable that streams all socket messages from all sockets. But if I subscribe to it, I don't have the sockets anymore, which makes this unidirectional.

I want

socket$ -> message$ -> server-processing -> socket$


But there are multiple use-cases for the responses, broadcast, multicast and unicast.

K.. K..
Answer

I found out that flatMap takes a second function, that receives the the argument value of flatMap AND the (flattened) return value of flatMap. This function can return a new value, that is used for all later operators.

const socketMessage$ = socket$.flatMap(

  socket => Observable.create(({complete, next}) => {

    socket.on('message', next)
    socket.on('close', complete)

    return () => socket.disconnect()

  }),

  (socket, message) => ({socket, message})

).share()
Comments