如何在 Spark Structured Streaming 中将静态数据帧与流式数据帧进行比较?
How to compare a static data frame with a streaming one in Spark Structured Streaming?
我需要比较两个 DataFrame。其中一个是静态的,另一个是流式的。
示例静态 DataFrame 如下所示:
id, value
2786, 5
7252, 3
2525, 4
8038, 1
示例流数据帧如下所示:
id, value
2786, 9
7252, 8
2525, 7
结果 DataFrame 应如下所示:
id, value
8038, 1
价值根本不重要。我只需要找到对于这个小批量我没有指定 id 8038 的值。我尝试为此使用 joins 和 subtract() 函数,但问题是流 - 静态连接不支持我需要的连接类型,并且当左侧的静态 DataFrame 时减法不起作用。例如这些表达式将 return 一个错误:
staticDF.subtract(streamingDF)
staticDF.join(streamingDF, staticDF.id = streamingDF.id, "left_anti")
在 Spark Structured Streaming 中,有没有办法获取 staticDF 中的 id 而不是 streamingDF 中的 id?
您可以使用 foreachBatch 接收器,然后对静态数据帧和微批使用左反连接。
streamingDf.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
println("------------------------")
println("Batch "+batchId+ " data")
println("Total Records " + batchDF.count())
println("------------------------")
staticDf.join(batchDF, staticDf("id") === batchDF("id"),"left_anti")
.select(staticDf("*")).show()
//You can also write your output using any writer
//e.g. df.write.format("csv").save("src/test/resources")
}.start()
输入:
static df
+----+-----+
| id|value|
+----+-----+
|2786| 5|
|7252| 3|
|2525| 4|
|8038| 1|
+----+-----+
streaming batch 0
2786,9
7252,8
2525,7
streaming batch 1
2786,9
7252,8
输出:
------------------------
Batch 0 data
Total Records 3
------------------------
+----+-----+
| id|value|
+----+-----+
|8038| 1|
+----+-----+
------------------------
Batch 1 data
Total Records 2
------------------------
+----+-----+
| id|value|
+----+-----+
|2525| 4|
|8038| 1|
+----+-----+
我需要比较两个 DataFrame。其中一个是静态的,另一个是流式的。 示例静态 DataFrame 如下所示:
id, value
2786, 5
7252, 3
2525, 4
8038, 1
示例流数据帧如下所示:
id, value
2786, 9
7252, 8
2525, 7
结果 DataFrame 应如下所示:
id, value
8038, 1
价值根本不重要。我只需要找到对于这个小批量我没有指定 id 8038 的值。我尝试为此使用 joins 和 subtract() 函数,但问题是流 - 静态连接不支持我需要的连接类型,并且当左侧的静态 DataFrame 时减法不起作用。例如这些表达式将 return 一个错误:
staticDF.subtract(streamingDF)
staticDF.join(streamingDF, staticDF.id = streamingDF.id, "left_anti")
在 Spark Structured Streaming 中,有没有办法获取 staticDF 中的 id 而不是 streamingDF 中的 id?
您可以使用 foreachBatch 接收器,然后对静态数据帧和微批使用左反连接。
streamingDf.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
println("------------------------")
println("Batch "+batchId+ " data")
println("Total Records " + batchDF.count())
println("------------------------")
staticDf.join(batchDF, staticDf("id") === batchDF("id"),"left_anti")
.select(staticDf("*")).show()
//You can also write your output using any writer
//e.g. df.write.format("csv").save("src/test/resources")
}.start()
输入:
static df
+----+-----+
| id|value|
+----+-----+
|2786| 5|
|7252| 3|
|2525| 4|
|8038| 1|
+----+-----+
streaming batch 0
2786,9
7252,8
2525,7
streaming batch 1
2786,9
7252,8
输出:
------------------------
Batch 0 data
Total Records 3
------------------------
+----+-----+
| id|value|
+----+-----+
|8038| 1|
+----+-----+
------------------------
Batch 1 data
Total Records 2
------------------------
+----+-----+
| id|value|
+----+-----+
|2525| 4|
|8038| 1|
+----+-----+