I'm subscribing to Kafka using a pattern with a wildcard, as shown below. The wildcard represents a dynamic customer id.
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
In the KafkaConsumer code, it supports list of topics, or a pattern,
def subscribe(self, topics=(), pattern=None, listener=None): """Subscribe to a list of topics, or a topic regex pattern Partitions will be dynamically assigned via a group coordinator. Topic subscriptions are not incremental: this list will replace the current assignment (if there is one).
So you can create a regex, with OR condition using
|, that should work as subscribe to multiple dynamic topics regex, as it internally uses
re module for matching.