将 netty 通道传递给队列并稍后将其用于不同线程上的写入是否有效?
Is it valid to pass netty channels to a queue and use it for writes on a different thread later on?
我有以下设置。有一个消息分发器根据名为 appId 的唯一标识符(每个连接的客户端)将入站客户端消息传播到配置数量的消息队列(在我的例子中是 LinkedBlockingQueues):
public class MessageDistributor {
private final List<BlockingQueue<MessageWrapper>> messageQueueBuckets;
public MessageDistributor(List<BlockingQueue<MessageWrapper>> messageQueueBuckets) {
this.messageQueueBuckets = messageQueueBuckets;
}
public void handle(String appId, MessageWrapper message) {
int index = (messageQueueBuckets.size() - 1) % hash(appId);
try {
messageQueueBuckets.get(index).offer(message);
} catch (Exception e) {
// handle exception
}
}
}
因为稍后我还需要回复消息,所以我将消息对象和 netty 通道包装在 MessageWrapper 中:
public class MessageWrapper {
private final Channel channel;
private final Message message;
public MessageWrapper(Channel channel, Message message) {
this.channel = channel;
this.message = message;
}
public Channel getChannel() {
return channel;
}
public Message getMessage() {
return message;
}
}
此外,还有一个消息消费者,它实现了一个 Runnable 并从分配的阻塞队列中获取新消息。这个人执行了一些 expensive/blocking 操作,我想在主 netty 事件循环之外进行这些操作,并且这些操作也不应该过多地阻止其他连接的客户端的操作,因此使用了几个队列:
public class MessageConsumer implements Runnable {
private final BlockingQueue<MessageWrapper> messageQueue;
public MessageConsumer(BlockingQueue<MessageWrapper> messageQueue) {
this.messageQueue = messageQueue;
}
@Override
public void run() {
while (true) {
try {
MessageWrapper msgWrap = messageQueue.take();
Channel channel = msgWrap.getChannel();
Message msg = msgWrap.getMessage();
doSthExepnsiveOrBlocking(channel, msg);
} catch (Exception e) {
// handle exception
}
}
}
public void doSthExepnsiveOrBlocking(Channel channel, Message msg) {
// some expsive/blocking operations
channe.writeAndFlush(someResultObj);
}
}
所有 类 的设置如下所示(messageExecutor 是一个大小为 8 的 DefaultEventeExecutorGroup
):
int nrOfWorkers = config.getNumberOfClientMessageQueues();
List<BlockingQueue<MessageWrapper>> messageQueueBuckets = new ArrayList<>(nrOfWorkers);
for (int i = 0; i < nrOfWorkers; i++) {
messageQueueBuckets.add(new LinkedBlockingQueue<>());
}
MessageDistributor distributor = new MessageDistributor(messageQueueBuckets);
List<MessageConsumer> consumers = new ArrayList<>(nrOfWorkers);
for (BlockingQueue<MessageWrapper> messageQueueBucket : messageQueueBuckets) {
MessageConsumer consumer = new MessageConsumer(messageQueueBucket);
consumers.add(consumer);
messageExecutor.submit(consumer);
}
我使用这种方法的目标是将连接的客户端彼此隔离(不完全,但至少有一点),并在不同线程上执行昂贵的操作。
现在我的问题是:将 netty 通道对象包装在这个 MessageWrapper
中供以后使用并在其他线程中访问它的 write 方法是否有效?
更新
我没有在 netty 之上构建额外的消息分发机制,而是决定简单地为我的阻塞通道处理程序使用一个单独的 EventExecutorGroup,看看它是如何工作的。
是的,从其他线程调用 Channel.*
方法是有效的。也就是说,当从属于 Channel
.
的 EventLoop
线程本身调用这些方法时,这些方法执行得最好
我有以下设置。有一个消息分发器根据名为 appId 的唯一标识符(每个连接的客户端)将入站客户端消息传播到配置数量的消息队列(在我的例子中是 LinkedBlockingQueues):
public class MessageDistributor {
private final List<BlockingQueue<MessageWrapper>> messageQueueBuckets;
public MessageDistributor(List<BlockingQueue<MessageWrapper>> messageQueueBuckets) {
this.messageQueueBuckets = messageQueueBuckets;
}
public void handle(String appId, MessageWrapper message) {
int index = (messageQueueBuckets.size() - 1) % hash(appId);
try {
messageQueueBuckets.get(index).offer(message);
} catch (Exception e) {
// handle exception
}
}
}
因为稍后我还需要回复消息,所以我将消息对象和 netty 通道包装在 MessageWrapper 中:
public class MessageWrapper {
private final Channel channel;
private final Message message;
public MessageWrapper(Channel channel, Message message) {
this.channel = channel;
this.message = message;
}
public Channel getChannel() {
return channel;
}
public Message getMessage() {
return message;
}
}
此外,还有一个消息消费者,它实现了一个 Runnable 并从分配的阻塞队列中获取新消息。这个人执行了一些 expensive/blocking 操作,我想在主 netty 事件循环之外进行这些操作,并且这些操作也不应该过多地阻止其他连接的客户端的操作,因此使用了几个队列:
public class MessageConsumer implements Runnable {
private final BlockingQueue<MessageWrapper> messageQueue;
public MessageConsumer(BlockingQueue<MessageWrapper> messageQueue) {
this.messageQueue = messageQueue;
}
@Override
public void run() {
while (true) {
try {
MessageWrapper msgWrap = messageQueue.take();
Channel channel = msgWrap.getChannel();
Message msg = msgWrap.getMessage();
doSthExepnsiveOrBlocking(channel, msg);
} catch (Exception e) {
// handle exception
}
}
}
public void doSthExepnsiveOrBlocking(Channel channel, Message msg) {
// some expsive/blocking operations
channe.writeAndFlush(someResultObj);
}
}
所有 类 的设置如下所示(messageExecutor 是一个大小为 8 的 DefaultEventeExecutorGroup
):
int nrOfWorkers = config.getNumberOfClientMessageQueues();
List<BlockingQueue<MessageWrapper>> messageQueueBuckets = new ArrayList<>(nrOfWorkers);
for (int i = 0; i < nrOfWorkers; i++) {
messageQueueBuckets.add(new LinkedBlockingQueue<>());
}
MessageDistributor distributor = new MessageDistributor(messageQueueBuckets);
List<MessageConsumer> consumers = new ArrayList<>(nrOfWorkers);
for (BlockingQueue<MessageWrapper> messageQueueBucket : messageQueueBuckets) {
MessageConsumer consumer = new MessageConsumer(messageQueueBucket);
consumers.add(consumer);
messageExecutor.submit(consumer);
}
我使用这种方法的目标是将连接的客户端彼此隔离(不完全,但至少有一点),并在不同线程上执行昂贵的操作。
现在我的问题是:将 netty 通道对象包装在这个 MessageWrapper
中供以后使用并在其他线程中访问它的 write 方法是否有效?
更新
我没有在 netty 之上构建额外的消息分发机制,而是决定简单地为我的阻塞通道处理程序使用一个单独的 EventExecutorGroup,看看它是如何工作的。
是的,从其他线程调用 Channel.*
方法是有效的。也就是说,当从属于 Channel
.
EventLoop
线程本身调用这些方法时,这些方法执行得最好