将 netty 通道传递给队列并稍后将其用于不同线程上的写入是否有效?

Is it valid to pass netty channels to a queue and use it for writes on a different thread later on?

我有以下设置。有一个消息分发器根据名为 appId 的唯一标识符(每个连接的客户端)将入站客户端消息传播到配置数量的消息队列(在我的例子中是 LinkedBlockingQueues):

public class MessageDistributor {

    private final List<BlockingQueue<MessageWrapper>> messageQueueBuckets;

    public MessageDistributor(List<BlockingQueue<MessageWrapper>> messageQueueBuckets) {
        this.messageQueueBuckets = messageQueueBuckets;
    }

    public void handle(String appId, MessageWrapper message) {
        int index = (messageQueueBuckets.size() - 1) % hash(appId);
        try {
            messageQueueBuckets.get(index).offer(message);
        } catch (Exception e) {
            // handle exception
        }
    }
}

因为稍后我还需要回复消息,所以我将消息对象和 netty 通道包装在 MessageWrapper 中:

public class MessageWrapper {

    private final Channel channel;
    private final Message message;

    public MessageWrapper(Channel channel, Message message) {
        this.channel = channel;
        this.message = message;
    }

    public Channel getChannel() {
        return channel;
    }

    public Message getMessage() {
        return message;
    }
}

此外,还有一个消息消费者,它实现了一个 Runnable 并从分配的阻塞队列中获取新消息。这个人执行了一些 expensive/blocking 操作,我想在主 netty 事件循环之外进行这些操作,并且这些操作也不应该过多地阻止其他连接的客户端的操作,因此使用了几个队列:

public class MessageConsumer implements Runnable {

    private final BlockingQueue<MessageWrapper> messageQueue;

    public MessageConsumer(BlockingQueue<MessageWrapper> messageQueue) {
        this.messageQueue = messageQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                MessageWrapper msgWrap = messageQueue.take();
                Channel channel = msgWrap.getChannel();
                Message msg = msgWrap.getMessage();

                doSthExepnsiveOrBlocking(channel, msg);    

            } catch (Exception e) {
                // handle exception
            }
        }
    }
    public void doSthExepnsiveOrBlocking(Channel channel, Message msg) {
        // some expsive/blocking operations
        channe.writeAndFlush(someResultObj);
    }
}

所有 类 的设置如下所示(messageExecutor 是一个大小为 8 的 DefaultEventeExecutorGroup):

int nrOfWorkers = config.getNumberOfClientMessageQueues();
List<BlockingQueue<MessageWrapper>> messageQueueBuckets = new ArrayList<>(nrOfWorkers);

for (int i = 0; i < nrOfWorkers; i++) {
    messageQueueBuckets.add(new LinkedBlockingQueue<>());
}
MessageDistributor distributor = new MessageDistributor(messageQueueBuckets);
List<MessageConsumer> consumers = new ArrayList<>(nrOfWorkers);

for (BlockingQueue<MessageWrapper> messageQueueBucket : messageQueueBuckets) {
    MessageConsumer consumer = new MessageConsumer(messageQueueBucket);
    consumers.add(consumer);
    messageExecutor.submit(consumer);
}

我使用这种方法的目标是将连接的客户端彼此隔离(不完全,但至少有一点),并在不同线程上执行昂贵的操作。

现在我的问题是:将 netty 通道对象包装在这个 MessageWrapper 中供以后使用并在其他线程中访问它的 write 方法是否有效?

更新

我没有在 netty 之上构建额外的消息分发机制,而是决定简单地为我的阻塞通道处理程序使用一个单独的 EventExecutorGroup,看看它是如何工作的。

是的,从其他线程调用 Channel.* 方法是有效的。也就是说,当从属于 Channel.

EventLoop 线程本身调用这些方法时,这些方法执行得最好