当我使用 akka 流在现有消费者组中创建新消费者时,如何寻求结束 kafka 主题?

How do I seek to end of a kafka topic when I am creating a new consumer in an existing consumer group with akka streams?

我有一个 akka 应用程序(在 JAVA 中)使用 commitablePartitionedSource 来消费来自 kafka 主题的消息。我有几个消费者群体,可以为多个主题吸引消费者。这是由动态配置驱动的,我可以在其中暂时关闭消费者,并可能在稍后重新启动它们。

当这个消费者重新启动时,我只想阅读新消息,而不是从我离开的地方开始。

有没有办法从 akka-alpakka 消费者那里获取 kafkaConsumer 对象,以便我可以在处理之前使用 seekToEnd()? 请让我知道是否还有其他方法可以实现这一目标?也许使用 akka 配置或不同类型的消费者?我不想维护自己的偏移量(希望不是唯一的选择)

我的配置设置为在启动消费者组时获取 latest 偏移量,但由于我正在关闭并重新启动单个消费者,它总是从我停止的地方开始消费。

我尝试为一个主题创建一个消费者组,但我有很多主题,结果非常耗费资源。我还寻找一种方法来清除存储在 kafka 中的该主题的偏移量,但未成功。

最简单的方法就是在每次启动重新启动消费者时创建一个新的消费者组。 Kafka 将在一段可配置的时间后(retention.ms)负责移除陈旧的消费者组

如果您很少重启消费者并且总是希望它处理新数据而不是赶上所有丢失的消息,那么此策略很好。

编辑

据我所知,访问底层 KafkaConsumer 的唯一方法是使用 committableExternalSource。这样您就可以访问 seekToEnd 方法,但是您还需要注意订阅提供每个分区的起始偏移量的主题(类似于您现在设置 committablePartitionedSource 但在 Akka).

之外

commitablePartitionedSourceAutoSubscription 作为输入,您不能指定偏移量。

你需要的是ManualSubscription或更高级别Subscription的方法,比如

> plainExternalSource
> committableExternalSource
> plainSource
...