使用 StreamingQueryListener QueryProgressEvent 时,Spark 结构化流获取的输入行数不正确
Spark Structured streaming getting incorrect number of input rows when using StreamingQueryListener QueryProgressEvent
我在使用 StreamingQueryListener 识别输入行数时遇到问题,我正在使用
queryProgress.progress().numInputRows()
当除了写入之外没有其他操作时,我得到正确的计数,但是当我添加某些操作(例如 df.count 或 df.isEmpty() 时,我的输入行数计数被打乱了。
非常感谢任何帮助
编辑
下面的代码有效
df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
@Override
public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
}
}).start();
这给出了错误的计数
df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
@Override
public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
streamDataset.count();
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
}
}).start();
备注
请忽略 write() 代码,在真实场景中数据正在写入 mysql
如
中定义的不止一个动作
streamDataset.count();
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
Spark 创建了两个“独立的”流,每个流都使用相同的数据。但是,两个流都在调用 onQueryProgress
。这在同一时间发生,因为这两个流被包装到相同的 foreachBatch
.
在您的特定情况下,与 count
.
的输出相比,您在 NumInputRows
中看到的数据是原来的两倍
该系数将根据您的操作数量而增加。
我在使用 StreamingQueryListener 识别输入行数时遇到问题,我正在使用
queryProgress.progress().numInputRows()
当除了写入之外没有其他操作时,我得到正确的计数,但是当我添加某些操作(例如 df.count 或 df.isEmpty() 时,我的输入行数计数被打乱了。
非常感谢任何帮助
编辑
下面的代码有效
df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
@Override
public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
}
}).start();
这给出了错误的计数
df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
@Override
public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
streamDataset.count();
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
}
}).start();
备注
请忽略 write() 代码,在真实场景中数据正在写入 mysql
如
中定义的不止一个动作streamDataset.count();
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
Spark 创建了两个“独立的”流,每个流都使用相同的数据。但是,两个流都在调用 onQueryProgress
。这在同一时间发生,因为这两个流被包装到相同的 foreachBatch
.
在您的特定情况下,与 count
.
NumInputRows
中看到的数据是原来的两倍
该系数将根据您的操作数量而增加。