以编程方式停止 Alpakka Kafka 流的正确方法

Proper way to programmatically stop an Alpakka Kafka stream

我们正在尝试将 Akka Streams 与 Alpakka Kafka 结合使用,以在服务中使用事件流。为了处理事件处理错误,我们使用了 Kafka 自动提交和多个队列。例如,如果我们有主题 user_created,我们想从产品服务中使用它,我们还会创建 user_created_for_products_faileduser_created_for_products_dead_letter。这两个额外的主题耦合到特定的 Kafka 消费者组。如果一个事件处理失败,它会进入失败队列,我们​​会在五分钟后尝试再次消费——如果再次失败,它会进入死信。

在部署时,我们希望确保不会丢失事件。所以我们试图在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但所有这些 "flying" 的事件都尚未处理。停止流和应用程序后,我们可以部署新代码并再次启动应用程序。

阅读文档后,我们已经了解了 KillSwitch 功能。我们在其中看到的问题是 shutdown 方法 returns Unit 而不是我们预期的 Future[Unit] 。我们不确定使用它不会丢失事件,因为在测试中它看起来太快而无法正常工作。

作为解决方法,我们为每个流创建一个 ActorSystem 并使用 terminate 方法(returns 和 Future[Terminate])。这个解决方案的问题是我们不认为为每个流创建一个 ActorSystem 会很好地扩展,并且 terminate 需要很多时间来解决(在测试中它最多需要一分钟才能关闭下来)。

你遇到过这样的问题吗?有没有更快的方法(与 ActorSystem.terminate 相比)来停止流并确保 Source 发出的所有事件都已处理?

来自 documentation(强调我的):

When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.

val (consumerControl, streamComplete) =
  Consumer
    .plainSource(consumerSettings,
                 Subscriptions.assignmentWithOffset(
                   new TopicPartition(topic, 0) -> offset
                 ))
    .via(businessFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

consumerControl.shutdown()

Consumer.control.shutdown()returns一个Future[Done]。从它的 Scaladoc 描述:

Shutdown the consumer Source. It will wait for outstanding offset commit requests to finish before shutting down.

或者,如果您在 Kafka 中使用 偏移存储,请使用 Consumer.Control.drainAndShutdown,这也是 returns 和 Future。再次来自文档(其中包含有关 drainAndShutdown 幕后操作的更多信息):

val drainingControl =
  Consumer
    .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record).map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()

val streamComplete = drainingControl.drainAndShutdown()

drainAndShutdown 的 Scaladoc 描述:

Stop producing messages from the Source, wait for stream completion and shut down the consumer Source so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.