PySpark 和卡夫卡 "Set are gone. Some data may have been missed.."
PySpark and Kafka "Set are gone. Some data may have been missed.."
我正在 运行在本地模式下使用 Spark 集群使用 PySpark,我正在尝试将流式数据帧写入 Kafka 主题。
当我 运行 查询时,我收到以下消息:
java.lang.IllegalStateException: Set(topicname-0) are gone. Some data may have been missed..
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
这是我的代码:
query = (
output_stream
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "ratings-cleaned")
.option("checkpointLocation", "checkpoints-folder")
.start()
)
sleep(2)
print(query.status)
此错误消息提示检查点存在问题。在开发过程中,这可能是由于使用具有更新查询的旧检查点文件夹引起的。
如果这是在开发环境中,并且您不需要保存之前查询的状态,您只需删除检查点文件夹(代码示例中的 checkpoints-folder
)并重新运行查询。
当某些 messages/offsets 自查询的最后一个 运行 后从源主题中删除时,通常会出现此错误消息。由于保留时间等清理策略而发生删除。
假设您的主题包含偏移量为 0、1、2 的消息,这些消息均已由应用程序处理。检查点文件存储最后一个偏移量 2 以记住下次启动时继续使用偏移量 3。
一段时间后,偏移量为 3、4、5 的消息生成到主题,但偏移量为 0、1、2、3 的消息由于保留而从主题中删除。
现在,当重新启动您的 spark 结构化流作业时,它会尝试根据其检查点文件获取 3,但意识到只有偏移量为 4 的消息可用。在这种情况下,它会抛出这个异常。
你可以通过
解决这个问题
- 在您的
readStream
操作中设置 .option("failOnDataLoss", "false")
,或
- 删除现有检查点文件
根据 Structured Streaming + Kafka Integration Guide 选项 failOnDataLoss
被描述为:
"Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. Batch queries will always fail if it fails to read any data from the provided offsets due to lost data."
除了上述答案之外,Bartosz Konieczny 还发布了 a more detailed reason。错误信息的第一部分说 Set()
是空的;这是一组主题分区(因此 -0
最后)。也就是说,Spark集群订阅的分区已经被删除。我的猜测是 Kafka 设置已重新启动。 Spark 查询正在使用一些默认的检查点文件夹,假设 Kafka 设置没有重新启动。
我正在 运行在本地模式下使用 Spark 集群使用 PySpark,我正在尝试将流式数据帧写入 Kafka 主题。
当我 运行 查询时,我收到以下消息:
java.lang.IllegalStateException: Set(topicname-0) are gone. Some data may have been missed..
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
这是我的代码:
query = (
output_stream
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "ratings-cleaned")
.option("checkpointLocation", "checkpoints-folder")
.start()
)
sleep(2)
print(query.status)
此错误消息提示检查点存在问题。在开发过程中,这可能是由于使用具有更新查询的旧检查点文件夹引起的。
如果这是在开发环境中,并且您不需要保存之前查询的状态,您只需删除检查点文件夹(代码示例中的 checkpoints-folder
)并重新运行查询。
当某些 messages/offsets 自查询的最后一个 运行 后从源主题中删除时,通常会出现此错误消息。由于保留时间等清理策略而发生删除。
假设您的主题包含偏移量为 0、1、2 的消息,这些消息均已由应用程序处理。检查点文件存储最后一个偏移量 2 以记住下次启动时继续使用偏移量 3。
一段时间后,偏移量为 3、4、5 的消息生成到主题,但偏移量为 0、1、2、3 的消息由于保留而从主题中删除。
现在,当重新启动您的 spark 结构化流作业时,它会尝试根据其检查点文件获取 3,但意识到只有偏移量为 4 的消息可用。在这种情况下,它会抛出这个异常。
你可以通过
解决这个问题- 在您的
readStream
操作中设置.option("failOnDataLoss", "false")
,或 - 删除现有检查点文件
根据 Structured Streaming + Kafka Integration Guide 选项 failOnDataLoss
被描述为:
"Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. Batch queries will always fail if it fails to read any data from the provided offsets due to lost data."
除了上述答案之外,Bartosz Konieczny 还发布了 a more detailed reason。错误信息的第一部分说 Set()
是空的;这是一组主题分区(因此 -0
最后)。也就是说,Spark集群订阅的分区已经被删除。我的猜测是 Kafka 设置已重新启动。 Spark 查询正在使用一些默认的检查点文件夹,假设 Kafka 设置没有重新启动。