如何重置 REST 代理中的偏移量以重新使用 Kafka 消息?

How to reset the offset in REST proxy for reconsuming Kafka messages?

我消费了一些来自Kafka队列的消息,现在我想再次消费它们(只需从头获取所有数据)。如何在 Scala 中或使用 Kafka Rest Proxy 重置偏移量?我目前正在使用 "auto.offset.reset": "smallest":

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
      --data '{"format": "json", "auto.offset.reset": "smallest"}' \
      XXX.XX.XXX.XXX:9092/consumers/test

请注意我对解决方案不感兴趣

URL 中的最后一个参数是 group Id。 Kafka 存储每个组 ID 的提交偏移量。因此,更改组 ID 应该就足够了 (.../consumers/newGroupId).