跳过 spark 结构化流处理中的批次
Skipping of batches in spark structured streaming process
我有一个 spark 结构化流作业,它使用来自 azure 事件中心服务的事件。在某些情况下,流式作业不会处理某些批次。在这种情况下,可以在结构化流式日志中看到以下日志记录语句:
INFO FileStreamSink: Skipping already committed batch 25
流式传输作业将传入事件保存到 Azure Datalake 中,因此我可以检查哪些事件实际上已经 processed/persisted。当发生上述跳过时,这些事件就丢失了!
我不清楚,为什么这些批次被标记为已提交,因为最后似乎它们没有被处理!
您是否知道可能导致此行为的原因?
谢谢!
我可以解决这个问题。问题是我有两个不同的流式处理作业,它们具有不同的检查点位置(这是正确的)但使用相同的基本文件夹作为它们的输出。但是在输出文件夹中还保存了元信息,因此两个流共享了他们已经提交的批次的信息。使用不同的基本输出文件夹后,问题得到解决。
我们遇到了同样的问题,Kafka 代理已经删除了数据。因此,为了强制 Spark 应用程序从头开始(Kafka 中的最新偏移量),我们删除了 checkpoint
和 _spark_metadata
目录。您可以在写入流的同一路径中找到 _spark_metadata
。
我有一个 spark 结构化流作业,它使用来自 azure 事件中心服务的事件。在某些情况下,流式作业不会处理某些批次。在这种情况下,可以在结构化流式日志中看到以下日志记录语句:
INFO FileStreamSink: Skipping already committed batch 25
流式传输作业将传入事件保存到 Azure Datalake 中,因此我可以检查哪些事件实际上已经 processed/persisted。当发生上述跳过时,这些事件就丢失了!
我不清楚,为什么这些批次被标记为已提交,因为最后似乎它们没有被处理!
您是否知道可能导致此行为的原因?
谢谢!
我可以解决这个问题。问题是我有两个不同的流式处理作业,它们具有不同的检查点位置(这是正确的)但使用相同的基本文件夹作为它们的输出。但是在输出文件夹中还保存了元信息,因此两个流共享了他们已经提交的批次的信息。使用不同的基本输出文件夹后,问题得到解决。
我们遇到了同样的问题,Kafka 代理已经删除了数据。因此,为了强制 Spark 应用程序从头开始(Kafka 中的最新偏移量),我们删除了 checkpoint
和 _spark_metadata
目录。您可以在写入流的同一路径中找到 _spark_metadata
。