消耗所有 Kafka 主题然后立即断开连接?

Consume all of a Kafka topic and then immediately disconnect?

我主要使用 Kafka 进行传统消息传递,但我也希望能够以批处理方式使用小主题,即连接到一个主题,使用所有消息并立即断开连接(不阻塞等待新消息消息)。我所有的主题都有一个分区(尽管它们在集群中被复制),如果可能的话我想使用高级消费者。从文档中不清楚我如何在 Scala(或 Java)中完成这样的事情。非常感谢收到任何建议。

如果之前没有消息被消费,consumer.timeout.ms 设置将在指定时间后抛出超时异常,这是高级消费者 afaik 的唯一选择。使用它,您可以将其设置为 1 秒左右,如果这是一个可接受的解决方案,则在此之后断开连接。

如果不是,则必须使用简单消费者并检查消息偏移量。