如何并行处理部分排序的消息?
How to parallelize processing of partially ordered messages?
我的基于 Netty 的应用程序
- 每秒从单个 TCP 连接接收数十万条消息
- 在几个入站处理程序中处理这些消息
- 将处理结果发送到下游某处
目前,所有这些都是 运行ning 在线程上,因为它是在单个 TCP 连接上。我想知道如何并行化 2。困难在于消息不能随意并行处理,因为消息存在部分顺序。你可以认为这是有一个key(message)
函数,这个函数returns相同结果的所有消息需要顺序处理,但如果结果不同,它们可能运行在平行下。所以我正在考虑从消息到线程的映射,例如 hash(key(message)) % threadCount
.
想象一下这条管道:
pipeline.addLast(deframer);
pipeline.addLast(new IdleStateHandler(...));
pipeline.addLast(decoder);
pipeline.addLast(bizLogicHandler1);
pipeline.addLast(bizLogicHandler2);
在解码器中,我能够计算 key(message)
的结果,所以我想并行化解码器下游的所有内容。 documented 为了使用多线程我可以做
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
pipeline.addLast(group, "bizLogicHandler1", bizLogicHandler1);
pipeline.addLast("bizLogicHandler2", bizLogicHandler2);
我猜这意味着 bizLogicHandler1 及其下面的所有内容(在上面的示例中是 bizLogicHandler2)将能够 运行 并行? (或者我还必须为 bizLogicHandler2 指定 group
吗?)
然而,上面的内容仍然 运行 完全串行,正如文档所解释的那样,提供 UnorderedThreadPoolEventExecutor
作为最大化并行性的替代方案,代价是完全摆脱排序,这是行不通的就我而言。
查看接口 EventExecutorGroup
和 EventExecutor
,我看不出如何传达哪些消息可以并行处理,哪些消息必须顺序处理。
有什么想法吗?
事实证明,这很容易用一个 LocalServerChannel and as many LocalChannel 秒来实现所需的并行性。
服务器通道将接收消息并将它们发送到客户端通道之一。另一个方向(从客户端通道到服务器通道)也适用。我以这种方式成功地并行化了一个应用程序,以实现更高的吞吐量,扩展到更多内核。
这是一个基本版本,去掉了大部分错误处理、日志记录和业务逻辑:
public class Parallelizer extends SimpleChannelInboundHandler<Message> {
private static final AtomicInteger EPHEMERAL_PORT = new AtomicInteger(0);
private final Channel[] internalChannels;
private final AtomicReference<ChannelHandlerContext> upstreamCtx = new AtomicReference<>(null);
public Parallelizer(EventLoopGroup eventLoopGroup, List<ChannelInitializer<Channel>> channelInitializers) throws InterruptedException {
internalChannels = (Channel[]) Array.newInstance(Channel.class, channelInitializers.size());
int port = EPHEMERAL_PORT.getAndIncrement();
final LocalAddress addr = new LocalAddress("PARALLELIZER-" + port);
createServerChannel(eventLoopGroup, addr);
channelInitializers.forEach(channelChannelInitializer -> createClientChannel(eventLoopGroup, addr, channelChannelInitializer));
waitForInternalClientsToConnect();
}
private void waitForInternalClientsToConnect() throws InterruptedException {
synchronized (internalChannels) {
while (internalChannels[internalChannels.length - 1] == null) {
internalChannels.wait();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
private void dispatch(Message msg, int clientChannelIdx) {
Channel channel = internalChannels[clientChannelIdx];
channel.writeAndFlush(msg, channel.voidPromise());
}
private void createClientChannel(EventLoopGroup eventLoopGroup, LocalAddress addr, ChannelInitializer<Channel> channelInitializer) {
Bootstrap cb = new Bootstrap();
cb.group(eventLoopGroup)
.channel(LocalChannel.class)
.handler(channelInitializer);
cb.connect(addr);
}
private void createServerChannel(EventLoopGroup eventLoopGroup, LocalAddress addr) throws InterruptedException {
ServerBootstrap sb = new ServerBootstrap();
sb.group(eventLoopGroup)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) {
ch.pipeline().addLast(new InternalHandler());
}
});
sb.bind(addr).sync();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
final int hash = msg.getDistributionKey().hashCode();
int clientChannelIdx = Integer.remainderUnsigned(hash, internalChannels.length);
dispatch(msg, clientChannelIdx);
}
private class InternalHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
synchronized (internalChannels) {
final int firstNull = Arrays.asList(internalChannels).indexOf(null);
internalChannels[firstNull] = ctx.channel();
internalChannels.notify();
}
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
final ChannelHandlerContext upstreamCtx = Parallelizer.this.upstreamCtx.get();
msg.retain();
if (upstreamCtx != null)
upstreamCtx.writeAndFlush(msg, upstreamCtx.channel().voidPromise());
}
}
}
我的基于 Netty 的应用程序
- 每秒从单个 TCP 连接接收数十万条消息
- 在几个入站处理程序中处理这些消息
- 将处理结果发送到下游某处
目前,所有这些都是 运行ning 在线程上,因为它是在单个 TCP 连接上。我想知道如何并行化 2。困难在于消息不能随意并行处理,因为消息存在部分顺序。你可以认为这是有一个key(message)
函数,这个函数returns相同结果的所有消息需要顺序处理,但如果结果不同,它们可能运行在平行下。所以我正在考虑从消息到线程的映射,例如 hash(key(message)) % threadCount
.
想象一下这条管道:
pipeline.addLast(deframer);
pipeline.addLast(new IdleStateHandler(...));
pipeline.addLast(decoder);
pipeline.addLast(bizLogicHandler1);
pipeline.addLast(bizLogicHandler2);
在解码器中,我能够计算 key(message)
的结果,所以我想并行化解码器下游的所有内容。 documented 为了使用多线程我可以做
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
pipeline.addLast(group, "bizLogicHandler1", bizLogicHandler1);
pipeline.addLast("bizLogicHandler2", bizLogicHandler2);
我猜这意味着 bizLogicHandler1 及其下面的所有内容(在上面的示例中是 bizLogicHandler2)将能够 运行 并行? (或者我还必须为 bizLogicHandler2 指定 group
吗?)
然而,上面的内容仍然 运行 完全串行,正如文档所解释的那样,提供 UnorderedThreadPoolEventExecutor
作为最大化并行性的替代方案,代价是完全摆脱排序,这是行不通的就我而言。
查看接口 EventExecutorGroup
和 EventExecutor
,我看不出如何传达哪些消息可以并行处理,哪些消息必须顺序处理。
有什么想法吗?
事实证明,这很容易用一个 LocalServerChannel and as many LocalChannel 秒来实现所需的并行性。
服务器通道将接收消息并将它们发送到客户端通道之一。另一个方向(从客户端通道到服务器通道)也适用。我以这种方式成功地并行化了一个应用程序,以实现更高的吞吐量,扩展到更多内核。
这是一个基本版本,去掉了大部分错误处理、日志记录和业务逻辑:
public class Parallelizer extends SimpleChannelInboundHandler<Message> {
private static final AtomicInteger EPHEMERAL_PORT = new AtomicInteger(0);
private final Channel[] internalChannels;
private final AtomicReference<ChannelHandlerContext> upstreamCtx = new AtomicReference<>(null);
public Parallelizer(EventLoopGroup eventLoopGroup, List<ChannelInitializer<Channel>> channelInitializers) throws InterruptedException {
internalChannels = (Channel[]) Array.newInstance(Channel.class, channelInitializers.size());
int port = EPHEMERAL_PORT.getAndIncrement();
final LocalAddress addr = new LocalAddress("PARALLELIZER-" + port);
createServerChannel(eventLoopGroup, addr);
channelInitializers.forEach(channelChannelInitializer -> createClientChannel(eventLoopGroup, addr, channelChannelInitializer));
waitForInternalClientsToConnect();
}
private void waitForInternalClientsToConnect() throws InterruptedException {
synchronized (internalChannels) {
while (internalChannels[internalChannels.length - 1] == null) {
internalChannels.wait();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
private void dispatch(Message msg, int clientChannelIdx) {
Channel channel = internalChannels[clientChannelIdx];
channel.writeAndFlush(msg, channel.voidPromise());
}
private void createClientChannel(EventLoopGroup eventLoopGroup, LocalAddress addr, ChannelInitializer<Channel> channelInitializer) {
Bootstrap cb = new Bootstrap();
cb.group(eventLoopGroup)
.channel(LocalChannel.class)
.handler(channelInitializer);
cb.connect(addr);
}
private void createServerChannel(EventLoopGroup eventLoopGroup, LocalAddress addr) throws InterruptedException {
ServerBootstrap sb = new ServerBootstrap();
sb.group(eventLoopGroup)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) {
ch.pipeline().addLast(new InternalHandler());
}
});
sb.bind(addr).sync();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
final int hash = msg.getDistributionKey().hashCode();
int clientChannelIdx = Integer.remainderUnsigned(hash, internalChannels.length);
dispatch(msg, clientChannelIdx);
}
private class InternalHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
synchronized (internalChannels) {
final int firstNull = Arrays.asList(internalChannels).indexOf(null);
internalChannels[firstNull] = ctx.channel();
internalChannels.notify();
}
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
final ChannelHandlerContext upstreamCtx = Parallelizer.this.upstreamCtx.get();
msg.retain();
if (upstreamCtx != null)
upstreamCtx.writeAndFlush(msg, upstreamCtx.channel().voidPromise());
}
}
}