Akka ConsistentHashingRoutingLogic 不一致地路由到同一个调度程序线程
Akka ConsistentHashingRoutingLogic not routing to the same dispatcher thread consistently
我正在尝试使用 Akka 的 ConsistentHashingRoutingLogic
来保证具有相同密钥的消息被路由到相同的 Actor。以 FIFO 顺序处理具有相同密钥的消息很重要。具有不同键的消息可以路由到不同的Actor并自由并行处理。我没有在分布式模式下使用 Akka。
这些消息实际上是 JSON 从 RabbitMQ 代理读取的消息,因此我的 Master actor 接收到 AMQP 消息并使用路由键作为消息键。相同的密钥也存在于消息本身中。参与者是 Spring 应用程序的一部分。
我的男主角是这样的:
@Named("MessageHandlerMaster")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MessageHandlerMaster extends UntypedActor {
private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class);
private Router router;
@Autowired
public MessageHandlerMaster(final SpringProps springProps) {
List<Routee> routees = Stream.generate(() -> {
ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class));
getContext().watch(worker);
return new ActorRefRoutee(worker);
}).limit(5) //todo: configurable number of workers
.collect(Collectors.toList());
router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
}
public void onReceive(Object message) {
if (message instanceof Message) {
Message amqpMessage = (Message) message;
String encoding = getMessageEncoding(amqpMessage);
try {
String json = new String(amqpMessage.getBody(), encoding);
String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
log.debug("Routing message based on routing key " + routingKey);
router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender());
} catch (UnsupportedEncodingException e) {
log.warn("Unknown content encoding sent in message! {}", encoding);
}
} else if (message instanceof Terminated) {
//if one of the routee's died, remove it and replace it
log.debug("Actor routee terminated!");
router.removeRoutee(((Terminated) message).actor());
ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class));
getContext().watch(r);
router = router.addRoutee(new ActorRefRoutee(r));
}
}
private static String getMessageEncoding(Message message) {
String encoding = message.getMessageProperties().getContentEncoding();
if ((encoding == null) || (encoding.equals(""))) {
encoding = "UTF-8";
}
return encoding;
}
}
我最初通过以下方式获得大师一次:
this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master");
然后通过以下方式向其提交消息:
master.tell(message, ActorRef.noSender());
但是,当我从我的工作人员 onReceive()
打印日志时,我发现有时针对同一个密钥使用不同的调度程序线程。
还不清楚为什么有时同一个调度程序线程被用于 Master actor 和 Worker actor。这不应该是线程之间传递的异步消息吗?
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
正如你在这里看到的,用于处理密钥为 10420186 的消息的 Worker 的调度线程有时是 9,有时是 10。Master actor 有时也使用这 2 个线程。
我如何确定 ConsistentHashingRoutingLogic
确实在工作并且同一个线程处理具有相同密钥的消息?我的路由器初始化有什么问题吗?
所以@vrudkovsk 的评论是正确的。我认为您在线程和演员之间感到困惑。 Actor 只是内存中具有地址和邮箱的对象。调度程序本质上是与参与者一起执行操作的线程池。示例操作是:
- 从邮箱中取出消息以在 actor 中处理它
- 将邮件排队到邮箱。
不同的线程可以为同一个actor执行动作。这是调度员决定的。 Akka 确保一次只有一个线程处理一个 actor 中的消息。这并不意味着它将始终是同一个线程。
如果您想确保他们来自同一个演员,我建议使用 context.self.path
或 context.self.path.address
记录演员路径或地址,因为它们是同一 [=] 中的唯一标识符12=]。
我正在尝试使用 Akka 的 ConsistentHashingRoutingLogic
来保证具有相同密钥的消息被路由到相同的 Actor。以 FIFO 顺序处理具有相同密钥的消息很重要。具有不同键的消息可以路由到不同的Actor并自由并行处理。我没有在分布式模式下使用 Akka。
这些消息实际上是 JSON 从 RabbitMQ 代理读取的消息,因此我的 Master actor 接收到 AMQP 消息并使用路由键作为消息键。相同的密钥也存在于消息本身中。参与者是 Spring 应用程序的一部分。
我的男主角是这样的:
@Named("MessageHandlerMaster")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MessageHandlerMaster extends UntypedActor {
private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class);
private Router router;
@Autowired
public MessageHandlerMaster(final SpringProps springProps) {
List<Routee> routees = Stream.generate(() -> {
ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class));
getContext().watch(worker);
return new ActorRefRoutee(worker);
}).limit(5) //todo: configurable number of workers
.collect(Collectors.toList());
router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
}
public void onReceive(Object message) {
if (message instanceof Message) {
Message amqpMessage = (Message) message;
String encoding = getMessageEncoding(amqpMessage);
try {
String json = new String(amqpMessage.getBody(), encoding);
String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
log.debug("Routing message based on routing key " + routingKey);
router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender());
} catch (UnsupportedEncodingException e) {
log.warn("Unknown content encoding sent in message! {}", encoding);
}
} else if (message instanceof Terminated) {
//if one of the routee's died, remove it and replace it
log.debug("Actor routee terminated!");
router.removeRoutee(((Terminated) message).actor());
ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class));
getContext().watch(r);
router = router.addRoutee(new ActorRefRoutee(r));
}
}
private static String getMessageEncoding(Message message) {
String encoding = message.getMessageProperties().getContentEncoding();
if ((encoding == null) || (encoding.equals(""))) {
encoding = "UTF-8";
}
return encoding;
}
}
我最初通过以下方式获得大师一次:
this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master");
然后通过以下方式向其提交消息:
master.tell(message, ActorRef.noSender());
但是,当我从我的工作人员 onReceive()
打印日志时,我发现有时针对同一个密钥使用不同的调度程序线程。
还不清楚为什么有时同一个调度程序线程被用于 Master actor 和 Worker actor。这不应该是线程之间传递的异步消息吗?
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
正如你在这里看到的,用于处理密钥为 10420186 的消息的 Worker 的调度线程有时是 9,有时是 10。Master actor 有时也使用这 2 个线程。
我如何确定 ConsistentHashingRoutingLogic
确实在工作并且同一个线程处理具有相同密钥的消息?我的路由器初始化有什么问题吗?
所以@vrudkovsk 的评论是正确的。我认为您在线程和演员之间感到困惑。 Actor 只是内存中具有地址和邮箱的对象。调度程序本质上是与参与者一起执行操作的线程池。示例操作是:
- 从邮箱中取出消息以在 actor 中处理它
- 将邮件排队到邮箱。
不同的线程可以为同一个actor执行动作。这是调度员决定的。 Akka 确保一次只有一个线程处理一个 actor 中的消息。这并不意味着它将始终是同一个线程。
如果您想确保他们来自同一个演员,我建议使用 context.self.path
或 context.self.path.address
记录演员路径或地址,因为它们是同一 [=] 中的唯一标识符12=]。