Akka ConsistentHashingRoutingLogic 不一致地路由到同一个调度程序线程

Akka ConsistentHashingRoutingLogic not routing to the same dispatcher thread consistently

我正在尝试使用 Akka 的 ConsistentHashingRoutingLogic 来保证具有相同密钥的消息被路由到相同的 Actor。以 FIFO 顺序处理具有相同密钥的消息很重要。具有不同键的消息可以路由到不同的Actor并自由并行处理。我没有在分布式模式下使用 Akka。

这些消息实际上是 JSON 从 RabbitMQ 代理读取的消息,因此我的 Master actor 接收到 AMQP 消息并使用路由键作为消息键。相同的密钥也存在于消息本身中。参与者是 Spring 应用程序的一部分。

我的男主角是这样的:

@Named("MessageHandlerMaster")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MessageHandlerMaster extends UntypedActor {

  private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class);

  private Router router;

  @Autowired
  public MessageHandlerMaster(final SpringProps springProps) {

  List<Routee> routees = Stream.generate(() -> {
      ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class));
      getContext().watch(worker);
      return new ActorRefRoutee(worker);
    }).limit(5) //todo: configurable number of workers
      .collect(Collectors.toList());

    router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
  }

  public void onReceive(Object message) {
    if (message instanceof Message) {
      Message amqpMessage = (Message) message;
      String encoding = getMessageEncoding(amqpMessage);
      try {
        String json = new String(amqpMessage.getBody(), encoding);
        String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
        log.debug("Routing message based on routing key " + routingKey);
        router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender());
      } catch (UnsupportedEncodingException e) {
        log.warn("Unknown content encoding sent in message! {}", encoding);
      }
    } else if (message instanceof Terminated) {
      //if one of the routee's died, remove it and replace it
      log.debug("Actor routee terminated!");
      router.removeRoutee(((Terminated) message).actor());
      ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class));
      getContext().watch(r);
      router = router.addRoutee(new ActorRefRoutee(r));
    }
  }

  private static String getMessageEncoding(Message message) {
    String encoding = message.getMessageProperties().getContentEncoding();
    if ((encoding == null) || (encoding.equals(""))) {
      encoding = "UTF-8";
    }
    return encoding;
  }
}

我最初通过以下方式获得大师一次:

this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master");

然后通过以下方式向其提交消息:

master.tell(message, ActorRef.noSender());

但是,当我从我的工作人员 onReceive() 打印日志时,我发现有时针对同一个密钥使用不同的调度程序线程。

还不清楚为什么有时同一个调度程序线程被用于 Master actor 和 Worker actor。这不应该是线程之间传递的异步消息吗?

16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186

正如你在这里看到的,用于处理密钥为 10420186 的消息的 Worker 的调度线程有时是 9,有时是 10。Master actor 有时也使用这 2 个线程。

我如何确定 ConsistentHashingRoutingLogic 确实在工作并且同一个线程处理具有相同密钥的消息?我的路由器初始化有什么问题吗?

所以@vrudkovsk 的评论是正确的。我认为您在线程和演员之间感到困惑。 Actor 只是内存中具有地址和邮箱的对象。调度程序本质上是与参与者一起执行操作的线程池。示例操作是:

  • 从邮箱中取出消息以在 actor 中处理它
  • 将邮件排队到邮箱。

不同的线程可以为同一个actor执行动作。这是调度员决定的。 Akka 确保一次只有一个线程处理一个 actor 中的消息。这并不意味着它将始终是同一个线程。

如果您想确保他们来自同一个演员,我建议使用 context.self.pathcontext.self.path.address 记录演员路径或地址,因为它们是同一 [=] 中的唯一标识符12=]。