流式传输到 SQL Datawarehouse 时 Azure Databricks 缺少条目
Azure Databricks Missing entries when streaming to SQL Datawarehouse
首先我有以下说明,当上传 20.000 个文件时,我在数据库中得到了 20.000 条记录(每个文件只包含 1 个记录)。
aTracking = sqlContext.read.format('csv').options(header='true', delimiter=';').schema(csvSchema).load("wasbs://" + blobContainer + "@" + blobStorage + ".blob.core.windows.net/rtT*.csv")
aTracking.write \
.option('user', dwUser) \
.option('password', dwPass) \
.jdbc('jdbc:sqlserver://' + dwServer + ':' + dwJdbcPort + ';database=' + dwDatabase, 'stg_tr_energy_xmlin.csv_in', mode = 'append' )
然后,出于速度目的,我认为使用 Polybase 进行流式传输会更好...编码为...但我只有 +- 17.000 个条目。
aTracking = spark.readStream.format('csv').options(header='true', delimiter=';').schema(csvSchema).load("wasbs://" + blobContainer + "@" + blobStorage + ".blob.core.windows.net/rtT*.csv")
aTracking.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", sqlDwUrl) \
.option("tempDir", "wasbs://uploaddw@" + blobStorage + ".blob.core.windows.net/stream") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "stg_tr_energy_xmlin.csv_in") \
.option("checkpointLocation", "/checkpoint") \
.start()
任何可能导致此问题的建议?
在检查点位置跟踪结构化流查询的状态。 "Every streaming source is assumed to have offsets (similar to Kafka offsets (...)) to track the read position in the stream. The engine uses checkpointing and write ahead logs to record the offset range of the data being processed in each trigger"。有关详细信息,请参阅 Spark documentation(搜索检查点)。
因此,如果您想重新处理所有文件,请删除以下定义的检查点位置目录(或定义一个新目录):
.option("checkpointLocation", "/checkpoint").
另外 _spark_metadata 目标目录中的文件被检查,所以如果你想再次写入所有数据,你还应该清理目标目录(使用 Azure SQL 数据仓库临时目录).
首先我有以下说明,当上传 20.000 个文件时,我在数据库中得到了 20.000 条记录(每个文件只包含 1 个记录)。
aTracking = sqlContext.read.format('csv').options(header='true', delimiter=';').schema(csvSchema).load("wasbs://" + blobContainer + "@" + blobStorage + ".blob.core.windows.net/rtT*.csv")
aTracking.write \
.option('user', dwUser) \
.option('password', dwPass) \
.jdbc('jdbc:sqlserver://' + dwServer + ':' + dwJdbcPort + ';database=' + dwDatabase, 'stg_tr_energy_xmlin.csv_in', mode = 'append' )
然后,出于速度目的,我认为使用 Polybase 进行流式传输会更好...编码为...但我只有 +- 17.000 个条目。
aTracking = spark.readStream.format('csv').options(header='true', delimiter=';').schema(csvSchema).load("wasbs://" + blobContainer + "@" + blobStorage + ".blob.core.windows.net/rtT*.csv")
aTracking.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", sqlDwUrl) \
.option("tempDir", "wasbs://uploaddw@" + blobStorage + ".blob.core.windows.net/stream") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "stg_tr_energy_xmlin.csv_in") \
.option("checkpointLocation", "/checkpoint") \
.start()
任何可能导致此问题的建议?
在检查点位置跟踪结构化流查询的状态。 "Every streaming source is assumed to have offsets (similar to Kafka offsets (...)) to track the read position in the stream. The engine uses checkpointing and write ahead logs to record the offset range of the data being processed in each trigger"。有关详细信息,请参阅 Spark documentation(搜索检查点)。
因此,如果您想重新处理所有文件,请删除以下定义的检查点位置目录(或定义一个新目录):
.option("checkpointLocation", "/checkpoint").
另外 _spark_metadata 目标目录中的文件被检查,所以如果你想再次写入所有数据,你还应该清理目标目录(使用 Azure SQL 数据仓库临时目录).