使用 StreamingQueryListener QueryProgressEvent 时,Spark 结构化流获取的输入行数不正确

Spark Structured streaming getting incorrect number of input rows when using StreamingQueryListener QueryProgressEvent

我在使用 StreamingQueryListener 识别输入行数时遇到问题,我正在使用

queryProgress.progress().numInputRows()

当除了写入之外没有其他操作时,我得到正确的计数,但是当我添加某些操作(例如 df.count 或 df.isEmpty() 时,我的输入行数计数被打乱了。

非常感谢任何帮助

编辑

下面的代码有效

df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
  @Override
  public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
    streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
  }
}).start();

这给出了错误的计数

df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
  @Override
  public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
    streamDataset.count();                                    
    streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
  }
}).start();

备注

请忽略 write() 代码,在真实场景中数据正在写入 mysql

中定义的不止一个动作
streamDataset.count();                                    
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");

Spark 创建了两个“独立的”流,每个流都使用相同的数据。但是,两个流都在调用 onQueryProgress。这在同一时间发生,因为这两个流被包装到相同的 foreachBatch.

在您的特定情况下,与 count.

的输出相比,您在 NumInputRows 中看到的数据是原来的两倍

该系数将根据您的操作数量而增加。