作为具有永久邮箱的无状态参与者

Akka stateless actors with Persistent Mailboxes

我想创建一个包含 1000 个 actor 的 Akka 集群。每个参与者都会收到一条消息,进行一些计算并将结果写入专门的 Kafka 主题。

应该部署在集群中,比如Kubernetes。

我的理解是 - 如果 actor 因任何原因终止(JVM 崩溃、重新部署或其他任何原因),那么其邮箱的内容 - 以及当前正在处理的消息 - 都会丢失!

这对我来说是完全不能接受的,因此我想实现一种拥有持久邮箱的方法。请注意,演员本身是无状态的,他们不需要重播消息或重建状态。我所需要的只是在 actor 终止时不丢失消息。

问题是:执行此操作的推荐方法是什么? and here 他们建议实施持久性参与者。但是就像我说的,我不需要坚持和恢复actor的任何状态。我是否应该实施基于持久存储(如 SQL 数据库)的自定义邮箱?

我也看到之前Akka的某个版本支持"durable"个邮箱,好像是我需要的。但出于某种原因他们删除了它,这令人困惑...

你可以用Kafka来实现你想要的。 Kafka 主题是持久的(如果您将 Kafka 中的保留设置为 forever 或在主题上启用日志压缩,则数据将保留 'for all time' 或者您可以将偏移量存储在 Kafka 之外)。

使用 Akka Streams,您将在广播您生成的消息(关于生成主题)后提交您收到的消息(关于接收主题),给您 "at-least-once” delivery semantics. (for "exactly-once", 你可以查看 Kafka Transactions)

这是来自 Alpakka Kafka 文档的示例:

Consumer.DrainingControl<Done> control =
    Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
        .map(
            msg ->
                ProducerMessage.single(
                    new ProducerRecord<>(targetTopic, msg.record().key(), msg.record().value()),
                    msg.committableOffset() // the passThrough
                    ))
        .via(Producer.flexiFlow(producerSettings))
        .map(m -> m.passThrough())
        .toMat(Committer.sink(committerSettings), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(materializer);

您可以通过几种方式将其与(集群池)Actor 集成。最简单的方法是使用 Ask 模式。在这种情况下,流会将消息传递给必须在预定义时间内回复的参与者(可能是 self())。当收到回复时,它将在提交原始消息之前在目标流上广播。

这看起来像:

Consumer.DrainingControl<Done> control =
    Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
            .mapAsync(1, msg -> 
                Patterns.ask(actor, msg, Duration.ofSeconds(5))
                    .thenApply(done ->
                        ProducerMessage.single(
                                new ProducerRecord<>(targetTopic, done.key(), done.value()),
                                msg.committableOffset() // the passThrough
                        )
                    )
            )
            .via(Producer.flexiFlow(producerSettings))
            .map(m -> m.passThrough())
            .toMat(Committer.sink(committerSettings), Keep.both())
            .mapMaterializedValue(Consumer::createDrainingControl)
            .run(materializer);

如果您有多个可以同时处理消息的参与者,您还可以增加 mapAsync 调用的并行度。

客户端 上使用持久性参与者是针对此类要求的建议。我知道你是说你的接收演员不需要 persistence/statefulness,但是通过在客户端上使用持久性,你要么在接收演员终止时重试,要么使用开箱即用的保证消息传递功能来确保它被处理了。本质上,持久性(在客户端)用于持久化发出的请求,以便 客户端 可以在必要时将消息重新发送到 "rebuild the mailbox"。

使用客户端持久化是:

  • 比持久邮箱更高效
  • 防止出现更多故障情况(例如在网络层丢弃的消息、应用程序逻辑中的故障)
  • 更灵活,支持更多类型的恢复(例如:只需要恢复部分消息的场景)

这就是 Akka 删除持久邮箱的原因:Akka Persistence/Guaranteed At Least Once delivery 本质上是比持久邮箱更好的解决方案。

stikkos 使用 Kafka 的答案也是可行的。我只是担心引入 Kafka 会增加很多复杂性。当然,任何持久性存储都会增加复杂性,所以我想这仅取决于您已经拥有的东西。