nambrot nambrot - 4 years ago 199
Scala Question

How to add a cooldown/rate-limit to a stream in Kafka Streams?

I'm new to streaming data processing, and have what I feel like must be a very basic use case.

Let's say I have a stream of

(User, Alert)
tuples. What I want is to rate-limit that stream per user. I.e. I want a stream that only outputs an alert for a user once. In the following lets say 60 minutes, any incoming alert for the user should just get swallowed. After those 60 minutes, an incoming alert should trigger again.

What I tried:

Using
aggregate
as a stateful transformation but having the aggregate state be time-dependent. However, even though the resulting
KTable
has no change in the aggregate value, The KTable (as a changelog) will continue to send elements down, thus not achieving the desired effect of "rate-limiting" the stream

val fooStream: KStream[String, String] = builder.stream("foobar2")
fooStream
.groupBy((key, string) => string)
.aggregate(() => "constant",
(aggKey: String, value: String, aggregate: String) => aggregate,
stringSerde,
"name")
.print


That delivers the following output:

[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)


It's generally unclear to me how/when
aggregate
decides to publish elements downstream. My original understanding was that it was immediate, but that doesn't seem to be the case. Windowing shouldn't help here as far as I can see.

Is it possibly the case that the Kafka Streams DSL currently does not account for this use case of stateful transformation, similar to Spark's updateStateByKey or Akka's statefulMapConcat? Will I have to use the lower-level Processor/Transformer API?

EDIT:

The Possible duplicate does go into the question of how record caching is causing some confusion as to when aggregations decide to publish elements downstream. However, the primary question was around how to achieve "rate-limiting" in the DSL. As @miguno points out, one would have to revert to the lower-level Processor API. Below I pasted the approach which is quite verbose:

val logConfig = new util.HashMap[String, String]();
// override min.insync.replicas
logConfig.put("min.insyc.replicas", "1")

case class StateRecord(alert: Alert, time: Long)

val countStore = Stores.create("Limiter")
.withKeys(integerSerde)
.withValues(new JsonSerde[StateRecord])
.persistent()
.enableLogging(logConfig)
.build();
builder.addStateStore(countStore)

class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] {
var context: ProcessorContext = null;
var store: KeyValueStore[Integer, StateRecord] = null;

override def init(context: ProcessorContext) = {
this.context = context
this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
}

override def transform(key: Integer, value: Alert) = {
val current = System.currentTimeMillis()
val newRecord = StateRecord(value._1, value._2, current)
store.get(key) match {
case StateRecord(_, time) if time + 15.seconds.toMillis < current => {
store.put(key, newRecord)
(key, value)
}
case StateRecord(_, _) => null
case null => {
store.put(key, newRecord)
(key, value)
}
}
}
}

Answer Source

Let's say I have a stream of (User, Alert) tuples. What I want is to rate-limit that stream per user. I.e. I want a stream that only outputs an alert for a user once. In the following lets say 60 minutes, any incoming alert for the user should just get swallowed. After those 60 minutes, an incoming alert should trigger again.

This is currently not possible when using the DSL of Kafka Streams. Instead, you can (and would need to) manually implement such behavior using the lower-level Processor API.

FYI: We have been having discussions in the Kafka community around whether or not to add such functionality (often called e.g. "triggers") to the DSL. So far, the decision has been to not at such functionality for the time being.

It's generally unclear to me how/when aggregate decides to publish elements downstream. My original understanding was that it was immediate, but that doesn't seem to be the case.

Yes, that was the initial behavior for Kafka 0.10.0.0. Since then (not sure what version you are using) we introduced record caching; if you disable record caching you get back the initial behavior, though from what I understand record caching would give you some kind of (indirect) knob for rate-limiting. So must probably you'd want to keep the caching enabled.

Unfortunately the Apache Kafka docs don't yet cover record caching, for the meantime you may want to read http://docs.confluent.io/current/streams/developer-guide.html#memory-management instead.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download