acekiller acekiller - 1 month ago 10
Apache Configuration Question

Confusion of Storm acker and guaranteed message processing

Now I am learning Storm's Guaranteeing Message Processing and am confused by some concepts in this part.

To guarantee a message emitted by a spout is fully processed, Storm uses acker to achieve this. Each time a spout emits a tuple, acker will assign "ack val" initialized as 0 to store the status of the tuple tree. Each time the downstream bolts of this tuple emit new tuple or ack an "old" tuple, the tuple ID will be XOR with "ack val". The acker only needs to check whether "ack val" is 0 or not to know the tuple has been fully processed. Let's see the code below:

public class WordReader implements IRichSpout {
... ...
while((str = reader.readLine()) != null){
this.collector.emit(new Values(str), str);
... ...
}


The code piece above is a spout in word count program from "Getting Started with Storm" tutorial. In the emit method, the 2nd parameter "str" is the messageId. I am confused by this parameter:
1) As I understand, each time a tuple (i.e., a message) is emitted no matter in spouts or in bolts, it should be Storm's responsibility to assign a 64-bit messageId to that message. Is that correct? Or here "str" is just a human-readable alias to this message?
2) No matter what's answer to 1), here "str" would be the same word in two different messages because in a text file there should be many duplicate words. If this is true, then how does Storm differentiate different messages? And what's the meaning of this parameter?
3) In some code piece, I see some spouts use the following code to set the message Id in Spout emit method:

public class RandomIntegerSpout extends BaseRichSpout {
private long msgId = 0;
collector.emit(new Values(..., ++msgId), msgId);
}


This is much closer to what I think it should be: the message ID should be totally different across different messages. But for this code piece, another confusion is: what will happen to private field "msgId" across different executors? Because each executor has its own msgId initialized as 0, then messages in different executors will be named from 0, 1, 2, and so on. Then how does Storm differentiate these messages?

I am novice to Storm, so maybe these problems are naive. Hope someone could help me to figure out. Thanks!

Answer

About message ID is general: internally it might be a 64bit value, but this 64bit value is computed as a hash from the msgID object provided in emit() within Spout. So you can hand any object as message ID (the probability that two objects hash to the same value is close to zero).

About using str: I think in this example, str contains a line (and not a word) and it is very unlikely that document contains the exact same line twice (if there are no empty lines which might be many).

About the counter as message id: you are absolutely right about you observation -- if multiple spouts are running in parallel, this would give message ID conflict and would break fault tolerance.

If you want to "fix" the counter approach, each counter should be initialized differently (best, from 1...#SpoutTasks). You can use the taskID for this (which is unique and can be accessed via TopologyContext provided in Spout.open()). Basically, you get all taskIDs for all parallel spout tasks, sort them, and assign each spout task its ordering number. Furthermore, you need to increment by "number of parallel spouts" instead of 1.