Sorona Sorona - 1 month ago 13
Scala Question

Websocket - Sink.actorRefWithAck and Source.queue - only one request TO server gets processed?

Consider this

def handle = WebSocket.accept[Array[Byte], Array[Byte]]
{
request =>
log.info("Handling byte-message")
ActorFlow.actorRef
{
out => MyActor.props(out)
}
}


Whenever a byte message is sent to the websocket, it gets delegated to the actor and before I get a log entry.

Works fine.

Now the same logic, with a Flow instead

def handle = WebSocket.accept[Array[Byte], Array[Byte]]
{
request =>
{
log.info("Handling byte-message")
Flow.fromSinkAndSource(sink, source).log("flow")
}
}


I'll add the rest of the code:

val tickingSource: Source[Array[Byte], Cancellable] =
Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
.map(_ => Wrapper().withKeepAlive(KeepAlive()).toByteArray)

val myActor = system.actorOf(Props{new MyActor(null)}, "myActor")

val serverMessageSource = Source
.queue[Array[Byte]](10, OverflowStrategy.backpressure)
.mapMaterializedValue { queue => myActor ! InitTunnel(queue)}

val sink = Sink.actorRefWithAck(myActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())

val source = tickingSource.merge(serverMessageSource)


It has a keepAlive source, and an actual source, if the server wants to push something, merged.

The sink is again the actor.

Now the problem is, in this scenario I get EXACTLY one message from the client TO the server, even if it sends more, they do not get passed to
myActor


At first I thought this may be due to the
null
reference passed to
myActor
here, but then the first one could not be processed either. I am out of ideas, what is causing this. The flow itself works, I get the keepAlive messages just fine and if I refresh the client (Scala.js) again, first request gets sent just fine to the server and server responds and all is well

edit to clarify:

I am NOT talking about the log entry here - I am sorry, I had another log entry in
myActor
and got myself confused.

If the client sends more than one message the server does not handle it. It never reaches the actor, although the client definitely sends it :(

What I would expect:

1) At first message from client to server, the websocket gets created
2) The websocket is kept alive by the server, via the
tickingSource
(that actually works!)
3) If the client sends another request, it gets handled by
myActor
and that also responds to the client over the websocket

So, 3) does not work. In fact, the client sends a message, but that never reaches
myActor
after the initial one :(

edit:

This is my actor logic for initializing the websocket/stream in
myActor
:

var tunnel: Option[SourceQueueWithComplete[Array[Byte]]] = None

override def receive: Receive = {
case i: InternalMessages.InitTunnel =>
log.info("Initializing tunnel")
tunnel = Some(i.sourceQueue)

case _: InternalMessages.Init =>
sender() ! InternalMessages.Acknowledged()
log.info("websocket stream initialized")

case _: InternalMessages.Completed =>
log.info("websocket stream completed")

case q: Question => {
tunnel match {
case Some(t) => t offer Answer()...
case None => log.error("No tunnel available")
}
}
}


object InternalMessages {
case class Acknowledged()
case class Init()
case class Completed()
case class InitTunnel(sourceQueue: SourceQueueWithComplete[Array[Byte]])
}

Answer Source

I have the feeling that you don't send acks after receiving the Question message, but you should as the akka docs says (http://doc.akka.io/docs/akka/current/scala/stream/stream-integrations.html#sink-actorrefwithack): It also requires the given acknowledgement message after each stream element to make back-pressure work.