Im building a KStream based topology and want to use .process() to write some intermediate values to a DB. This step doesn't change the nature of the data so using a map/reduce sort of function seems unnecessary.
Adding a Processor in and of itself is a somewhat lengthy process as one needs to create a ProcessorSupplier and pass this instance to the KStream.process() function along with a State Store name. It's this step that has me a bit baffled.
Exception in thread "main" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore my-state-store is not added yet.
Process all elements in this stream, one element at a time, by applying a Processor.
Step 1: Defining the
StateStoreSupplier countStore = Stores.create("Counts") .withKeys(Serdes.String()) .withValues(Serdes.Long()) .persistent() .build();
- I don't see a way to add a StateStore object to my topology. It requires a StateStoreSupplier as well though.
Step 2: Adding the state store to your topology.
Option A - When using the Processor API:
TopologyBuilder builder = new TopologyBuilder(); // add the source processor node that takes Kafka topic "source-topic" as input builder.addSource("Source", "source-topic") .addProcessor("Process", () -> new WordCountProcessor(), "Source") // Add the countStore associated with the WordCountProcessor processor .addStateStore(countStore, "Process") .addSink("Sink", "sink-topic", "Process");
Option B - When using the Kafka Streams DSL:
Here you need to call
KStreamBuilder#addStateStore("name-of-your-store") to add the state store to your processor topology. Then, when calling methods such as
KStream#transform(), you must also pass in the name of the state store -- otherwise your application will fail at runtime.
At the example of
KStreamBuilder builder = new KStreamBuilder(); StateStoreSupplier countStore = Stores.create("Counts") .withKeys(Serdes.String()) .withValues(Serdes.Long()) .persistent() .build(); // Add the countStore that will be used within the Transformer[Supplier] // that we pass into `transform()` below. builder.addStateStore(countStore); KStream<byte, String> input = builder.stream(inputTopic); KStream<String, Long> transformed = input.transform(/* your TransformerSupplier */, countStore.name());
- Is this even the appropriate way to do a 'one off'?
Not sure I fully understand this question. :-)
- Why is it necc for a processor to have a state store? Seems like this should be optional for processors that don't maintain state.
You are right -- you don't need a state store if your processor does not maintain state.
Edit (after the follow-up question): If you are using the DSL, you need to simply call
KStreamBuilder#addStateStore("name-of-your-store") to add the state store to your processor topology.