为什么我的 writeStream 函数只写一行?
Why is my writeStream function writing only one line?
我一直在努力理解这种奇怪的 Spark 流行为。
我想使用 Spark Streaming 将 2 个 CSV 文件写入增量 table。
我做这个例子只是为了了解 Streams 是如何工作的,我不想使用其他解决方案我只需要了解为什么这不起作用。
所以,我必须在 /test/input
:
中生成 CSV 文件
A.csv
+---+---+
| id| x|
+---+---+
| 1| A|
| 2| B|
| 3| C|
+---+---+
B.csv
+---+---+
| id| x|
+---+---+
| 4| D|
| 5| E|
+---+---+
我将目录(上面两个数据帧的联合)作为流读取:
schema = StructType([StructField("id",IntegerType(),True), StructField("x",StringType(),True)])
df = spark.readStream.format("csv").schema(schema).option("ignoreChanges", "true").option("delimiter", ";").option("header", True).load("/test/input")
然后我想使用以下代码编写此流:
def processDf(df, epoch_id):
Ids=[x.id for x in df.select("id").distinct().collect()]
for i in Ids:
temp_df=df.filter((df.id==i))
temp_df.write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id=="+str(i)).mode("append").save("/test/res")
df.writeStream.format("delta").foreachBatch(processDf).queryName("x").option("checkpointLocation", "/test/check").trigger(once=True).start()
未显示任何错误。代码执行成功。
当我去检查 /test/res 中的文件时,我发现所有数据:
但是当我检查增量数据时,我注意到只有第一行:
df= (spark.read.format("delta").option("sep", ";").option("header", "true").load("/test/res")).cache()
+---+---+
| id| x|
+---+---+
| 1| A|
+---+---+
为什么不插入所有行?是 replaceWhere
选项吗?
replaceWhere 应该只删除已经在 table 中并在源数据中更新的分区。
请问我做错了什么。
编辑:
即使我在输入中只读取一个 CSV,也会注意到相同的行为。代码仍然只在输出中写入一行而不是所有行。
这实际上是一个语法错误,我用以下内容更改了循环块并且它起作用了:
for i in ids:
i=str(i)
tmp = df.filter(df.id == i)
tmp.write.format("delta").option("mergeSchema", "true").partitionBy(PartitionKey).option("replaceWhere", "id == '$i'".format(i=i)).save("/res/")
我一直在努力理解这种奇怪的 Spark 流行为。
我想使用 Spark Streaming 将 2 个 CSV 文件写入增量 table。
我做这个例子只是为了了解 Streams 是如何工作的,我不想使用其他解决方案我只需要了解为什么这不起作用。
所以,我必须在 /test/input
:
A.csv
+---+---+
| id| x|
+---+---+
| 1| A|
| 2| B|
| 3| C|
+---+---+
B.csv
+---+---+
| id| x|
+---+---+
| 4| D|
| 5| E|
+---+---+
我将目录(上面两个数据帧的联合)作为流读取:
schema = StructType([StructField("id",IntegerType(),True), StructField("x",StringType(),True)])
df = spark.readStream.format("csv").schema(schema).option("ignoreChanges", "true").option("delimiter", ";").option("header", True).load("/test/input")
然后我想使用以下代码编写此流:
def processDf(df, epoch_id):
Ids=[x.id for x in df.select("id").distinct().collect()]
for i in Ids:
temp_df=df.filter((df.id==i))
temp_df.write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id=="+str(i)).mode("append").save("/test/res")
df.writeStream.format("delta").foreachBatch(processDf).queryName("x").option("checkpointLocation", "/test/check").trigger(once=True).start()
未显示任何错误。代码执行成功。 当我去检查 /test/res 中的文件时,我发现所有数据:
但是当我检查增量数据时,我注意到只有第一行:
df= (spark.read.format("delta").option("sep", ";").option("header", "true").load("/test/res")).cache()
+---+---+
| id| x|
+---+---+
| 1| A|
+---+---+
为什么不插入所有行?是 replaceWhere
选项吗?
replaceWhere 应该只删除已经在 table 中并在源数据中更新的分区。
请问我做错了什么。
编辑:
即使我在输入中只读取一个 CSV,也会注意到相同的行为。代码仍然只在输出中写入一行而不是所有行。
这实际上是一个语法错误,我用以下内容更改了循环块并且它起作用了:
for i in ids:
i=str(i)
tmp = df.filter(df.id == i)
tmp.write.format("delta").option("mergeSchema", "true").partitionBy(PartitionKey).option("replaceWhere", "id == '$i'".format(i=i)).save("/res/")