user63898 user63898 - 1 month ago 23
Java Question

Netty getting An exceptionCaught() event was fired, and it reached at the tail of the pipeline on TextWebsocketEncoder

I try to do simple web socket decode and then encode but I'm getting this exception when it pass the TextWebsocketDecoder handler:

io.netty.channel.DefaultChannelPipeline$TailContext exceptionCaught
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
at io.netty.buffer.DefaultByteBufHolder.release(DefaultByteBufHolder.java:73)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:59)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:112)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler$1.channelRead(WebSocketServerProtocolHandler.java:147)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)


What I have is simple Initializer which work find until TextWebsocketEncoder:

public class ServerInitializer extends ChannelInitializer<Channel> {
private final ChannelGroup group;

public GameServerInitializer(ChannelGroup group) {
this.group = group;
}

@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler(group));
pipeline.addLast("textWebsocketDecoder",new TextWebsocketDecoder());
pipeline.addLast("textWebsocketEncoder",new TextWebsocketEncoder());
}
}


TextWebSocketFrameHandler

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
private final ChannelGroup group;

public TextWebSocketFrameHandler(ChannelGroup group) {
this.group = group;
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {

ctx.pipeline().remove(HttpRequestHandler.class);

group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));

group.add(ctx.channel());

} else {
super.userEventTriggered(ctx, evt);
}
}

@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
ctx.fireChannelRead(msg);
//group.writeAndFlush(msg.retain());
}
}


and this are the TextWebsocketDecoder and TextWebsocketEncoder :

TextWebsocketDecoder :

public class TextWebsocketDecoder extends MessageToMessageDecoder<TextWebSocketFrame>
{

@Override
protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame frame, List<Object> out) throws Exception
{
String json = frame.text();
JSONObject jsonObject = new JSONObject(json);
int type = jsonObject.getInt("type");
JSONArray msgJsonArray = jsonObject.getJSONArray("msg");
String user = msgJsonArray.getString(0);
String pass = msgJsonArray.getString(1);
String connectionkey = msgJsonArray.getString(2);
int timestamp = jsonObject.getInt("timestamp");

JSONObject responseJson = new JSONObject();
responseJson.put("type",Config.LOGIN_SUCCESS);
responseJson.put("connectionkey",connectionkey);

out.add(responseJson); // After This im getting the exception !!!
}
}


TextWebsocketEncoder

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

public class TextWebsocketEncoder extends MessageToMessageEncoder<JSONObject>
{

@Override
protected void encode(ChannelHandlerContext arg0, JSONObject arg1, List<Object> out) throws Exception {
String json = arg1.toString();
out.add(new TextWebSocketFrame(json));
}

}

Answer

The exception

Inside your TextWebSocketFrameHandler, you are calling ctx.fireChannelRead(msg);, this passes the message up 1 chain, however MessageToMessageDecoder isn't prepared to deal with this. To explain this problem I need to explain how the MessageToMessageDecoder works.

MessageToMessageDecoder works by catching every message from the upstream and passing them to your custom code, your custom code handles the work, and the mtmd handles the closing of the resource you passed in.

Since you are passing the reference to the other side, you are effectively closing the WebSocketFrame multiple times, causing bugs. MessageToMessageDecoder even warns you for this in the javadoc.

To solve the problem, we follow the instruction in the manual and make our channelRead the following:

@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
     msg.retain(); // ferrybig: fixed bug http://stackoverflow.com/q/34634750/1542723
     ctx.fireChannelRead(msg);
     //group.writeAndFlush(msg.retain());
}

The not sending back problem

Inside your comments, you stated the code doesn't send anything back. This is expected as your pipeline only consumes data and passes it up the chain. To fix this, it would require some rework at your pipeline.

  1. We need to swap the order of the json-webframe decoder and encoder:

    pipeline.addLast("textWebsocketDecoder",new TextWebsocketEncoder());
    pipeline.addLast("textWebsocketEncoder",new TextWebsocketDecoder());
    

    This is because your Decoder is generating the output that would be send back ↑ the chain of handlers, this output won't be seen by the encoder if the decoder was above that. (Your decoder shouldn't be called a decoder following the netty naming)

  2. We need to change your decoder to send the generated data actually back ↑ the chain instead of ↓ into the non-existing void.

    To make these changes, we going to let the TextWebSocketDecoder extend ChannelInboundHandlerAdapter instead of MessageToMessageDecoder<TextWebSocketFrame> since we are handling messages instead of passing them to a other handler.

    We are changing the signature of the decode method to channelRead(ChannelHandlerContext ctx, Object msg), and add some boilerplate code:

    public void channelRead(ChannelHandlerContext ctx, Object msg) /* throws Exception */
        TextWebSocketFrame frame = (TextWebSocketFrame) msg;
        try {
            /* Remaining code, follow the steps further of see end result */
        } finally {
            frame.release();
        }
    }
    
  3. We adapt our code to pass the result up the pipeline instead of down:

    public void channelRead(ChannelHandlerContext ctx, Object msg) /* throws Exception */
        TextWebSocketFrame frame = (TextWebSocketFrame) msg;
        try {
    
            String json = frame.text(); 
            JSONObject jsonObject = new JSONObject(json);
            int type = jsonObject.getInt("type");
            JSONArray msgJsonArray = jsonObject.getJSONArray("msg");
            String user = msgJsonArray.getString(0);
            String pass = msgJsonArray.getString(1);
            String connectionkey = msgJsonArray.getString(2);
            int timestamp = jsonObject.getInt("timestamp");
    
            JSONObject responseJson = new JSONObject();
            responseJson.put("type",Config.LOGIN_SUCCESS);
            responseJson.put("connectionkey",connectionkey);
    
            ctx.writeAndFlush(responseJson)
        } finally {
            frame.release();
        }
    }
    

Notice that you may be tempted to remove our previous code from the exception, but doing this will trigger undefined behavior when ran under the async nature of netty.