如何将数据帧的每一行写入/写入流到不同的增量table

How to write / writeStream each row of a dataframe into a different delta table

我的数据框的每一行都有一个 CSV 内容。

我正在努力将每一行保存在不同的特定 table 中。

我认为我需要使用 foreach 或 UDF 才能完成此操作,但这根本行不通。

我设法找到的所有内容就像 foreachs 中的简单打印或使用 .collect() 的代码(我真的不想使用它)。

我也找到了重新分区的方法,但是那不允许我选择每行的位置。

rows = df.count()
df.repartition(rows).write.csv('save-dir')

你能给我一个简单有效的例子吗?

将每一行保存为 Table 是一项代价高昂的操作,不推荐这样做。但是你正在尝试的可以像这样实现 -

df.write.format("delta").partitionBy("<primary-key-column>").save("/delta/save-dir")

现在每一行都将保存为 .parquet 格式,您可以从每个分区创建外部 table。这仅在您对每一行都有唯一值时才有效,即主键。

好吧,总而言之,一如既往,它非常简单,但我根本没看到。

基本上,当您执行 foreach 并且要保存的数据帧构建在循环内时。 worker不像driver,不会在保存时自动设置“/dbfs/”路径,所以如果你不手动添加“/dbfs/”,它会在worker本地保存数据。

这就是我的循环不工作的原因。

你试过了吗.mode("append").repartionBy("ID"),它会为每个ID创建一个目录,然后不要忘记把模式