I have 2 Kafka topics streaming the exact same content from different sources so I can have high availability in case one of the sources fails.
I'm attempting to merge the 2 topics into 1 output topic using Kafka Streams 0.10.1.0 such that I don't miss any messages on failures and there are no duplicates when all sources are up.
When using the
KStream-KStream leftJoin is always driven by records arriving from the primary stream
KStream mergedStream = stream1.outerJoin(stream2,
(streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
.reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
.toStream((key,value) -> value)
Using any kind of join will not solve your problem, as you will always end up with either missing result (inner-join in case some streams stalls) or "duplicates" with
null (left-join or outer-join in case both streams are online). See https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics for details on join semantics in Kafka Streams.
Thus, I would recommend to use Processor API that you can mix-and-match with DSL using
transformValues(). See http://stackoverflow.com/a/40837977/6167108 for more details.
You can also add a custom store to your processor (How to add a custom StateStore to the Kafka Streams DSL processor?) to make duplicate-filtering fault-tolerant.