ItayB ItayB - 6 months ago 30
Node.js Question

Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)

I've app that procude&consume message with kafka and protocol buffer and everything works great. I'm serialize the protocol buffer with

SerializeAsString()
(this app was written in c++).

Now, I've added new node.js website that also consume messages and try to decode them.

My js code (using the great ProtoBuf.js module):

var builder = ProtoBuf.loadProtoFile("/home/aii/general/proto/All.proto"),
protobuf = builder.build("protobuf"),
Trace = protobuf.Trace,
MessageType = protobuf.MessageType,
MessageTypeAck = protobuf.MessageTypeAck,
MessageTypeKeepAlive = protobuf.MessageTypeKeepAlive;

function getMessageType(val) {
return Object.keys(MessageType).filter(function(key) {return MessageType[key] === val})[0]
}

consumer.on('message', function (message) {
try{
switch(getMessageType(message.key[0])) {
case 'MESSAGE_TYPE_ACK':
console.log(MessageTypeAck.decode(message.value));
break;
case 'MESSAGE_TYPE_KEEP_ALIVE':
console.log(MessageTypeKeepAlive.decode(message.value));
break;
default:
console.log("Unknown message type");
}
} catch (e){
if (e.decoded) {
var err = e.decoded;
console.log(err);
}
else {
console.log(e);
}
}
});


Result:

[Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)]


My proto files:

Trace.proto:

package protobuf;

message Trace {
optional string topic = 1;
optional int32 partition = 2;
optional int64 offset = 3;
}


MessageType.proto

package protobuf;

enum MessageType {
MESSAGE_TYPE_ACK = 1;
MESSAGE_TYPE_KEEP_ALIVE = 2;
}


Messages.proto:

import "Trace.proto";

package protobuf;

message MessageTypeAck {
repeated Trace trace = 1;

optional string sourceModuleName = 2;
optional int32 sourceModuleID = 3;
}

message MessageTypeKeepAlive {
repeated Trace trace = 1;

optional string sourceModuleName = 2;
optional int32 sourceModuleID = 3;
}


All.proto

import "Trace.proto"
import "MessageType.proto";
import "Messages.proto"


What am I doing wrong? (decode?)

Answer

so, Thanks to this SO question&answer, I figured it out! The problem is related to the way I've consume the buffer (by kafka) - as utf-8 (default). It actually related to code which I didn't attached:

var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client('localhost:2181'),
    consumer = new Consumer(
        client,
        [
            { topic: 'Genesis', partition: 0 }
        ],
        {
            autoCommit: false,
            encoding: 'buffer'
        }
    ); 

and the solution was to add the encoding: 'buffer' line (the default is 'utf-8' as mentioned here).