如何在 Spark Structured Streaming 中将静态数据帧与流式数据帧进行比较?

How to compare a static data frame with a streaming one in Spark Structured Streaming?

我需要比较两个 DataFrame。其中一个是静态的,另一个是流式的。 示例静态 DataFrame 如下所示:

 id, value
2786,  5
7252,  3
2525,  4
8038,  1

示例流数据帧如下所示:

 id, value
2786,  9
7252,  8
2525,  7

结果 DataFrame 应如下所示:

id, value
8038, 1

价值根本不重要。我只需要找到对于这个小批量我没有指定 id 8038 的值。我尝试为此使用 joins 和 subtract() 函数,但问题是流 - 静态连接不支持我需要的连接类型,并且当左侧的静态 DataFrame 时减法不起作用。例如这些表达式将 return 一个错误:

staticDF.subtract(streamingDF)
staticDF.join(streamingDF, staticDF.id = streamingDF.id, "left_anti")

在 Spark Structured Streaming 中,有没有办法获取 staticDF 中的 id 而不是 streamingDF 中的 id?

您可以使用 foreachBatch 接收器,然后对静态数据帧和微批使用左反连接。

streamingDf.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  println("------------------------")
  println("Batch "+batchId+ " data")
  println("Total Records " + batchDF.count())
  println("------------------------")
  staticDf.join(batchDF, staticDf("id") === batchDF("id"),"left_anti")
    .select(staticDf("*")).show()

//You can also write your output using any writer
//e.g. df.write.format("csv").save("src/test/resources")

}.start()

输入:

static df
+----+-----+
|  id|value|
+----+-----+
|2786|    5|
|7252|    3|
|2525|    4|
|8038|    1|
+----+-----+

streaming batch 0
2786,9
7252,8
2525,7

streaming batch 1
2786,9
7252,8

输出:

------------------------
Batch 0 data
Total Records 3
------------------------
+----+-----+
|  id|value|
+----+-----+
|8038|    1|
+----+-----+

------------------------
Batch 1 data
Total Records 2
------------------------
+----+-----+
|  id|value|
+----+-----+
|2525|    4|
|8038|    1|
+----+-----+