JasperT JasperT - 4 years ago 153
Scala Question

How to log flow rate in Akka Stream?

I have an Akka Stream application with a single flow/graph. I want to measure the flow rate at the source and log it every 5 seconds, like 'received 3 messages in the last 5 seconds'. I tried with,

someOtherFlow
.groupedWithin(Integer.MAX_VALUE, 5 seconds)
.runForeach(seq =>
log.debug(s"received ${seq.length} messages in the last 5 seconds")
)


but it only outputs when there are messages, no empty list when there are 0 messages. I want the 0's as well. Is this possible?

Answer Source

You could try something like

  src
    .conflateWithSeed(_ ⇒ Seq.empty[T]){ case (acc, item) ⇒ acc :+ item }
    .zip(Source.tick(5.seconds, 5.seconds, NotUsed))
    .map(_._1.size)

which should batch your elements until the tick releases them.

On a different note, if you need this for monitoring purposes, you could leverage a 3rd party tool for this purpose - e.g. Kamon.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download