如何并行处理部分排序的消息?

How to parallelize processing of partially ordered messages?

我的基于 Netty 的应用程序

  1. 每秒从单个 TCP 连接接收数十万条消息
  2. 在几个入站处理程序中处理这些消息
  3. 将处理结果发送到下游某处

目前,所有这些都是 运行ning 在线程上,因为它是在单个 TCP 连接上。我想知道如何并行化 2。困难在于消息不能随意并行处理,因为消息存在部分顺序。你可以认为这是有一个key(message)函数,这个函数returns相同结果的所有消息需要顺序处理,但如果结果不同,它们可能运行在平行下。所以我正在考虑从消息到线程的映射,例如 hash(key(message)) % threadCount.

想象一下这条管道:

pipeline.addLast(deframer);
pipeline.addLast(new IdleStateHandler(...));
pipeline.addLast(decoder);
pipeline.addLast(bizLogicHandler1);
pipeline.addLast(bizLogicHandler2);

在解码器中,我能够计算 key(message) 的结果,所以我想并行化解码器下游的所有内容。 documented 为了使用多线程我可以做

static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
pipeline.addLast(group, "bizLogicHandler1", bizLogicHandler1);
pipeline.addLast("bizLogicHandler2", bizLogicHandler2);

我猜这意味着 bizLogicHandler1 及其下面的所有内容(在上面的示例中是 bizLogicHandler2)将能够 运行 并行? (或者我还必须为 bizLogicHandler2 指定 group 吗?)

然而,上面的内容仍然 运行 完全串行,正如文档所解释的那样,提供 UnorderedThreadPoolEventExecutor 作为最大化并行性的替代方案,代价是完全摆脱排序,这是行不通的就我而言。

查看接口 EventExecutorGroupEventExecutor,我看不出如何传达哪些消息可以并行处理,哪些消息必须顺序处理。

有什么想法吗?

事实证明,这很容易用一个 LocalServerChannel and as many LocalChannel 秒来实现所需的并行性。

服务器通道将接收消息并将它们发送到客户端通道之一。另一个方向(从客户端通道到服务器通道)也适用。我以这种方式成功地并行化了一个应用程序,以实现更高的吞吐量,扩展到更多内核。

这是一个基本版本,去掉了大部分错误处理、日志记录和业务逻辑:

public class Parallelizer extends SimpleChannelInboundHandler<Message> {

    private static final AtomicInteger EPHEMERAL_PORT = new AtomicInteger(0);
    private final Channel[] internalChannels;
    private final AtomicReference<ChannelHandlerContext> upstreamCtx = new AtomicReference<>(null);

    public Parallelizer(EventLoopGroup eventLoopGroup, List<ChannelInitializer<Channel>> channelInitializers) throws InterruptedException {
        internalChannels = (Channel[]) Array.newInstance(Channel.class, channelInitializers.size());

        int port = EPHEMERAL_PORT.getAndIncrement();
        final LocalAddress addr = new LocalAddress("PARALLELIZER-" + port);
        createServerChannel(eventLoopGroup, addr);

        channelInitializers.forEach(channelChannelInitializer -> createClientChannel(eventLoopGroup, addr, channelChannelInitializer));

        waitForInternalClientsToConnect();
    }

    private void waitForInternalClientsToConnect() throws InterruptedException {
        synchronized (internalChannels) {
            while (internalChannels[internalChannels.length - 1] == null) {
                internalChannels.wait();
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

    private void dispatch(Message msg, int clientChannelIdx) {
        Channel channel = internalChannels[clientChannelIdx];
        channel.writeAndFlush(msg, channel.voidPromise());
    }

    private void createClientChannel(EventLoopGroup eventLoopGroup, LocalAddress addr, ChannelInitializer<Channel> channelInitializer) {
        Bootstrap cb = new Bootstrap();
        cb.group(eventLoopGroup)
                .channel(LocalChannel.class)
                .handler(channelInitializer);
        cb.connect(addr);
    }

    private void createServerChannel(EventLoopGroup eventLoopGroup, LocalAddress addr) throws InterruptedException {
        ServerBootstrap sb = new ServerBootstrap();
        sb.group(eventLoopGroup)
                .channel(LocalServerChannel.class)
                .childHandler(new ChannelInitializer<LocalChannel>() {
                    @Override
                    public void initChannel(LocalChannel ch) {
                        ch.pipeline().addLast(new InternalHandler());
                    }
                });
        sb.bind(addr).sync();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
        final int hash = msg.getDistributionKey().hashCode();
        int clientChannelIdx = Integer.remainderUnsigned(hash, internalChannels.length);
        dispatch(msg, clientChannelIdx);
    }

    private class InternalHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.close();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            synchronized (internalChannels) {
                final int firstNull = Arrays.asList(internalChannels).indexOf(null);
                internalChannels[firstNull] = ctx.channel();
                internalChannels.notify();
            }
            super.channelActive(ctx);
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
            final ChannelHandlerContext upstreamCtx = Parallelizer.this.upstreamCtx.get();
            msg.retain();
            if (upstreamCtx != null)
                upstreamCtx.writeAndFlush(msg, upstreamCtx.channel().voidPromise());
        }
    }
}