spark 无法使用新的 groupId 从 Kafka 获取消息
spark can not get message from Kafka with new groupId
我正在使用 spark streaming 读取来自 Kafka 的消息,它工作正常。但是我有一个要求需要重新阅读消息。我在想我可能只需要更改 spark 的客户 groupId 并重新启动 spark 流应用程序,它应该从头开始重新读取 kafka 消息。但是结果是Spark收不到任何消息,我很疑惑。通过 Kafka 文档,如果您更改客户 groupId,那么它应该从头开始收到消息,因为 kafka 将您视为新客户。提前致谢!
Kafka 消费者有一个名为 auto.offset.reset 的 属性(参见 Kafka Doc)。这告诉消费者在开始消费但尚未提交偏移量时该怎么做。这是你的情况。该主题有消息,但没有存储起始偏移量,因为您尚未阅读该新组 ID 下的任何内容。在这种情况下,使用 auto.offset.reset 属性。如果该值为 "largest",并且这是默认值),则开始位置将设置为最大偏移量(最后一个),您将获得所看到的行为。如果值为 "smallest",则偏移量设置为起始偏移量,消费者将读取整个分区。这就是你想要的。
所以我不太确定您如何在 Spark 应用程序中设置 Kafka 属性,但如果您想要新组 ID 以读取整个主题。
听起来您正在为 Kafka 使用基于 api 的 Spark Streaming 接收器。正如您所注意到的,api auto.offset.reset 仅在 ZK 中没有偏移量时才适用。
如果您希望能够指定确切的偏移量,请参阅以 fromOffsets 作为参数的 createDirectStream 调用的版本。
我正在使用 spark streaming 读取来自 Kafka 的消息,它工作正常。但是我有一个要求需要重新阅读消息。我在想我可能只需要更改 spark 的客户 groupId 并重新启动 spark 流应用程序,它应该从头开始重新读取 kafka 消息。但是结果是Spark收不到任何消息,我很疑惑。通过 Kafka 文档,如果您更改客户 groupId,那么它应该从头开始收到消息,因为 kafka 将您视为新客户。提前致谢!
Kafka 消费者有一个名为 auto.offset.reset 的 属性(参见 Kafka Doc)。这告诉消费者在开始消费但尚未提交偏移量时该怎么做。这是你的情况。该主题有消息,但没有存储起始偏移量,因为您尚未阅读该新组 ID 下的任何内容。在这种情况下,使用 auto.offset.reset 属性。如果该值为 "largest",并且这是默认值),则开始位置将设置为最大偏移量(最后一个),您将获得所看到的行为。如果值为 "smallest",则偏移量设置为起始偏移量,消费者将读取整个分区。这就是你想要的。
所以我不太确定您如何在 Spark 应用程序中设置 Kafka 属性,但如果您想要新组 ID 以读取整个主题。
听起来您正在为 Kafka 使用基于 api 的 Spark Streaming 接收器。正如您所注意到的,api auto.offset.reset 仅在 ZK 中没有偏移量时才适用。
如果您希望能够指定确切的偏移量,请参阅以 fromOffsets 作为参数的 createDirectStream 调用的版本。