如何获取spark结构化流中的写入记录数?
How to get number of written records in spark structured streaming?
我在单个 spark 会话中配置了一些结构化流。我需要知道每个流中读取和写入了多少记录。
例如,我有这两个流:
- 读取-s3 -> 转换 -> 写入-s3
- read-s3 -> transform -> write-db
我知道如何使用 SparkListener().onTaskEnd(),但那时我没有查询名称,而且 taskEnd.taskMetrics().outputMetrics().recordsWritten()
始终为 0,所以它不是一个选项。
另一种方法是在dataset.map() 中使用累加器来递增计算。但是这不是写入记录的数量而是要写入的记录(如果sink没有失败)。
除此之外,我尝试使用 StreamingQueryListener(我用它来获取 numInputRows
),但我找不到任何关于写入记录数的指标。
是否有可能获得此类指标?
2.3.1 版本修复了 a bug in FileStreamSink。
作为变通方法,在映射函数中使用 accumulators 在写入接收器之前计算记录数。
我在单个 spark 会话中配置了一些结构化流。我需要知道每个流中读取和写入了多少记录。 例如,我有这两个流:
- 读取-s3 -> 转换 -> 写入-s3
- read-s3 -> transform -> write-db
我知道如何使用 SparkListener().onTaskEnd(),但那时我没有查询名称,而且 taskEnd.taskMetrics().outputMetrics().recordsWritten()
始终为 0,所以它不是一个选项。
另一种方法是在dataset.map() 中使用累加器来递增计算。但是这不是写入记录的数量而是要写入的记录(如果sink没有失败)。
除此之外,我尝试使用 StreamingQueryListener(我用它来获取 numInputRows
),但我找不到任何关于写入记录数的指标。
是否有可能获得此类指标?
2.3.1 版本修复了 a bug in FileStreamSink。
作为变通方法,在映射函数中使用 accumulators 在写入接收器之前计算记录数。