如何使用 alpakka、spring boot 和 Akka-stream 初始化一个连续的 运行 流?

how to initialize a continous running stream using alpakka, spring boot & Akka-stream?

全部,

我正在开发一个应用程序,它使用 alpakka spring 启动集成从 kafka 读取数据。我已经准备好大部分代码,唯一卡住的地方是如何初始化一个连续的 运行 流,因为这将是一个后端应用程序并且不会有任何 api 被调用?

据我所知,Alpakka 的 Spring 集成基本上是围绕通过 Spring HTTP 控制器公开 Akka 流而设计的。所以我不确定将 Spring 引入此服务的目的是什么,因为 Akka 应用程序倾向于工作的方式与 Spring 应用程序倾向于喜欢的方式之间存在相当大的阻抗不匹配上班。

假设您正在谈论使用 Alpakka Kafka,最惯用的做法是启动一个由 Alpakka Kafka Source in your main method and it will run until killed or it fails. You may want to use a RestartSource 围绕消费者和业务逻辑提供的流,以确保在发生故障时流重新启动(请注意,人们通常应该期望偏移量提交没有发生的消息再次被处理,因为 Kafka 在典型情况下只能保证至少一次处理)。