Delta Table 到 Spark Streaming 到 azure databricks 中的 Synapse Table

Delta Table to Spark Streaming to Synapse Table in azure databricks

我需要编写合并的 DELTA Tables 并将其同步到 Azure 数据仓库。我们正在尝试读取 Delta Table,但 Spark 流不允许将流写入 Synapse Tables。

然后我尝试读取 DBFS 文件夹中镶木地板文件中的 DELTA 表,我们能够批量读取和 insert/write 数据到 Synapse DWH Tables 但我们无法保留 DELTA表与 Synapse 同步 Tables.

我们怎样才能只在 Synapse 表中有最新的快照?我们没有使用 Synapse Analytics 工作区并尝试使用数据块中的火花流作业来实现这一点。

任何指点都会有所帮助。

您可以尝试通过在写入流的 forEachBatch 中附加新数据数据帧来保持数据同步,此方法允许以任意方式写入数据,您可以使用 jdbc 连接到 Datawarehouse如有必要:

df = spark.readStream\
          .format('delta')\
          .load(input_path)

df_write = df.writeStream \
            .format("delta") \
            .foreachBatch(batch_process) \
            .option("checkpointLocation", delat_chk_path) \
            .start(sink_path)\

你的批处理函数是这样的:

def batch_process(df, batchId):
  
    df = df.transformAsNeeded()
    df.write.jdbc(jdbc_url, table=schema_name + "." + table_name, mode="append", properties=connection_properties)

当然,如果增量插入适合您的问题,您也可以尝试保留“最终”table 或时间视图,其中填充了您想要的结果数据的快照,当在数据块中生成时,截断数据仓库中的目标 table 并用此视图覆盖它(当然使用 jdbc ),这可能很慢,如果我没记错的话可能有一个突触连接器。您可以尝试类似的方法,将文件以 CSV、parquet 或 delta 格式直接写入您的存储帐户,并在带有数据集的突触中引用它。您还可以进行批量更新,也许使用数据工厂中的集成管道,您可以根据需要使用数据块或笔记本执行。