为什么我的 writeStream 函数只写一行?

Why is my writeStream function writing only one line?

我一直在努力理解这种奇怪的 Spark 流行为。

我想使用 Spark Streaming 将 2 个 CSV 文件写入增量 table。

我做这个例子只是为了了解 Streams 是如何工作的,我不想使用其他解决方案我只需要了解为什么这不起作用。

所以,我必须在 /test/input:

中生成 CSV 文件
A.csv
+---+---+
| id|  x|
+---+---+
|  1|  A|
|  2|  B|
|  3|  C|
+---+---+

B.csv
+---+---+
| id|  x|
+---+---+
|  4|  D|
|  5|  E|
+---+---+

我将目录(上面两个数据帧的联合)作为流读取:

schema = StructType([StructField("id",IntegerType(),True), StructField("x",StringType(),True)])                

df = spark.readStream.format("csv").schema(schema).option("ignoreChanges", "true").option("delimiter", ";").option("header", True).load("/test/input")

然后我想使用以下代码编写此流:

def processDf(df, epoch_id):
  Ids=[x.id for x in df.select("id").distinct().collect()]
  for i in Ids:
    temp_df=df.filter((df.id==i))
    temp_df.write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id=="+str(i)).mode("append").save("/test/res")
  
  
df.writeStream.format("delta").foreachBatch(processDf).queryName("x").option("checkpointLocation", "/test/check").trigger(once=True).start()

未显示任何错误。代码执行成功。 当我去检查 /test/res 中的文件时,我发现所有数据:

但是当我检查增量数据时,我注意到只有第一行:

df= (spark.read.format("delta").option("sep", ";").option("header", "true").load("/test/res")).cache()

+---+---+
| id|  x|
+---+---+
|  1|  A|
+---+---+

为什么不插入所有行?是 replaceWhere 选项吗? replaceWhere 应该只删除已经在 table 中并在源数据中更新的分区。

请问我做错了什么。

编辑:

即使我在输入中只读取一个 CSV,也会注意到相同的行为。代码仍然只在输出中写入一行而不是所有行。

这实际上是一个语法错误,我用以下内容更改了循环块并且它起作用了:

for i in ids:
    i=str(i)
    tmp = df.filter(df.id == i)
    tmp.write.format("delta").option("mergeSchema", "true").partitionBy(PartitionKey).option("replaceWhere", "id == '$i'".format(i=i)).save("/res/")