Netty NioSocketChannel 在多线程写入时收到损坏的消息

Netty NioSocketChannel got broken message on multithread write

我正在使用 Netty 4.0.25 编写我自己的简单游戏服务器。 我想支持 msgpack 二进制数据,因此频道上的每条消息 send/receive 总是由 msgpack.encoded/decoded encoded/decoded。

我的服务器:

    bossGroup = new NioEventLoopGroup(4);
    workerGroup = new NioEventLoopGroup(4);

    bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup);
    bootstrap.channel(NioServerSocketChannel.class);
    bootstrap.handler(new LoggingHandler(LogLevel.DEBUG));
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
            ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
            ch.pipeline().addLast(new NettyNioBinaryTcpSession());
        }
    });

    // Bind and start to accept incoming connections.
    channelFuture = bootstrap.bind(this.getGatewayConfig().getHost(), this.getGatewayConfig().getPort());

和客户:

    group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group);
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.option(ChannelOption.TCP_NODELAY, true);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            if (sslCtx != null) {
                p.addLast(sslCtx.newHandler(ch.alloc(), getServerAddress().getHost(), getServerAddress()
                            .getPort()));
            }
            p.addLast("frameEncoder", new LengthFieldPrepender(4));
            p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
            p.addLast(new SessionClientHandler());
        }
    });

    // Start the client.
    channelFuture = bootstrap.connect(getServerAddress().getHost(), getServerAddress().getPort());

一切正常,直到我尝试从客户端发送 1000 message/second 并从服务器返回。我的服务器正常工作,没有记录错误,但在我的客户端,我得到:

2015-03-04 20:20:22.173 [SocketReceivingWorker-3] WARN  com.puppet.client.io.SocketReceivingHandler (SocketReceivingHandler.java:71) - got pong message, but message id not found���� -> length: 4 -> base 64: 1//z/Q==

从客户端发送到服务器并返回的每条消息都有 12 个字节的大小。

不知道为什么,研究了几天也没找到。

提前致谢,

更新:SocketReceivingHandler.java:

package com.puppet.client.io;

import static org.msgpack.template.Templates.TByteArray;
import static org.msgpack.template.Templates.TString;
import static org.msgpack.template.Templates.tMap;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Base64;
import java.util.Map;

import org.msgpack.MessagePack;
import org.msgpack.MessageTypeException;
import org.msgpack.template.Template;
import org.msgpack.unpacker.Unpacker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.WorkHandler;
import com.puppet.client.PuppetClient;
import com.puppet.client.exceptions.MessageUnrecognizedException;
import com.puppet.common.Fields;
import com.puppet.common.PuCommand;
import com.puppet.common.PuEventType;
import com.puppet.common.message.concrete.PingMessage;
import com.puppet.eventdriven.impl.BaseEvent;

public class SocketReceivingHandler implements WorkHandler<SocketReceivingEvent> {

    private static final Logger logger = LoggerFactory.getLogger(SocketReceivingHandler.class);

    private static final MessagePack msgpack = new MessagePack();
    private static Template<Map<String, byte[]>> stringToBytesMapTemplate = tMap(TString, TByteArray);

    private PuppetClient client;

    public SocketReceivingHandler(PuppetClient client) {
        this.client = client;
    }

    @Override
    public void onEvent(SocketReceivingEvent event) throws Exception {

        ByteArrayInputStream byteArrInputStream = new ByteArrayInputStream(event.getData());
        Unpacker unpacker = msgpack.createUnpacker(byteArrInputStream);

        try {
            Map<String, byte[]> request = unpacker.read(stringToBytesMapTemplate);
            PuCommand command = PuCommand.fromId(request.get(Fields.COMMAND));
            if (command != null) {
                handleCommand(command, request.get(Fields.DATA));
            } else {
                throw new MessageUnrecognizedException();
            }
        } catch (MessageTypeException mtEx) {
            logger.debug("message error: " + new String(event.getData()) + " -> length " + event.getData().length
                    + " -> base64: " + Base64.getEncoder().encodeToString(event.getData()));
            throw new MessageUnrecognizedException(mtEx);
        }
    }

    private void handleCommand(PuCommand command, byte[] data) throws IOException {
        switch (command) {
        case PONG:
            this.client.dispatchEvent(new BaseEvent(PuEventType.PONG, "data", data));
            Long startTime = PingMessage.getTimeForMessageId(data);
            PingMessage.removeTimeForMessageId(data);
            if (startTime != null) {
                this.client.dispatchEvent(new BaseEvent(PuEventType.PING_PONG, "delay", System.nanoTime() - startTime));
            } else {
                logger.warn("got pong message, but message id not found" + new String(data) + " -> length: "
                        + data.length + " -> base 64: " + Base64.getEncoder().encodeToString(data));
            }
            break;
        default:
            logger.debug("unrecognized command: " + command);
            break;
        }
    }
}

event.getData() ==> 数据从socket读取到channelRead menthod.

这是我的错误,乒乓消息的id而不是自增int,我使用的是随机字节。有时那个 id 是重复的...然后会发生错误。

谢谢!