将 Databricks 中的 spark 数据帧写入 Azure Synapse 时出错
Error when write spark dataframe from Databricks into Azure Synapse
我正在尝试将 spark 数据帧写入 Azure Syanpse 数据库。
我的代码:
try:
re_spdf.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.option("encrypt", 'True') \
.option("trustServerCertificate", 'false') \
.option("hostNameInCertificate", '*.database.windows.net') \
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
.option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver')\
.save()
except ValueError as error :
print("Connector write failed", error)
错误信息:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 1 in stage 29.0 failed 4 times, most recent failure:
Lost task 1.3 in stage 29.0 (TID 885, 10.139.64.8, executor 0):
com.microsoft.sqlserver.jdbc.SQLServerException:
PdwManagedToNativeInteropException ErrorNumber: 46724, MajorCode: 467,
MinorCode: 24, Severity: 20, State: 2, Exception of type
'Microsoft.SqlServer.DataWarehouse.Tds.PdwManagedToNativeInteropException' was thrown.
我什至用谷歌搜索了这条错误消息。我没有得到任何有用的解决方案。
更新:我的工作环境是Databricks pyspark notebook。
如有任何建议,我们将不胜感激。
Azure databricks documentation 表示格式 com.databricks.spark.sqldw
到 read/write 数据 from/to 来自 Azure Synapse table.
的数据
如果您使用的是 Synapse,为什么不用 Synapse notebook,然后编写数据框就像调用 synapsesql
一样简单,例如
%%spark
df.write.synapsesql("yourPool.dbo.someXMLTable_processed", Constants.INTERNAL)
你会省去一些麻烦,而且性能应该很好,因为它是并行化的。这是主要文章:
https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/synapse-spark-sql-pool-import-export
突触数据库中存在一些列长度限制 table。它将只允许 4000 个字符。
所以当我使用 com.databricks.spark.sqldw
时,因为它使用 Polybase
作为连接器,我还需要更改 DB table 中列的长度。
参考:https://forums.databricks.com/questions/21032/databricks-throwing-error-sql-dw-failed-to-execute.html
代码:
df.write \
.format("com.databricks.spark.sqldw") \
.mode("append") \
.option("url", url) \
.option("user", username) \
.option("password", password) \
.option("maxStrLength", "4000" ) \
.option("tempDir", "tempdirdetails") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
.option("dbTable", table_name) \
.save()
我正在尝试将 spark 数据帧写入 Azure Syanpse 数据库。
我的代码:
try:
re_spdf.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.option("encrypt", 'True') \
.option("trustServerCertificate", 'false') \
.option("hostNameInCertificate", '*.database.windows.net') \
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
.option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver')\
.save()
except ValueError as error :
print("Connector write failed", error)
错误信息:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 1 in stage 29.0 failed 4 times, most recent failure:
Lost task 1.3 in stage 29.0 (TID 885, 10.139.64.8, executor 0):
com.microsoft.sqlserver.jdbc.SQLServerException:
PdwManagedToNativeInteropException ErrorNumber: 46724, MajorCode: 467,
MinorCode: 24, Severity: 20, State: 2, Exception of type
'Microsoft.SqlServer.DataWarehouse.Tds.PdwManagedToNativeInteropException' was thrown.
我什至用谷歌搜索了这条错误消息。我没有得到任何有用的解决方案。
更新:我的工作环境是Databricks pyspark notebook。
如有任何建议,我们将不胜感激。
Azure databricks documentation 表示格式 com.databricks.spark.sqldw
到 read/write 数据 from/to 来自 Azure Synapse table.
如果您使用的是 Synapse,为什么不用 Synapse notebook,然后编写数据框就像调用 synapsesql
一样简单,例如
%%spark
df.write.synapsesql("yourPool.dbo.someXMLTable_processed", Constants.INTERNAL)
你会省去一些麻烦,而且性能应该很好,因为它是并行化的。这是主要文章:
https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/synapse-spark-sql-pool-import-export
突触数据库中存在一些列长度限制 table。它将只允许 4000 个字符。
所以当我使用 com.databricks.spark.sqldw
时,因为它使用 Polybase
作为连接器,我还需要更改 DB table 中列的长度。
参考:https://forums.databricks.com/questions/21032/databricks-throwing-error-sql-dw-failed-to-execute.html
代码:
df.write \
.format("com.databricks.spark.sqldw") \
.mode("append") \
.option("url", url) \
.option("user", username) \
.option("password", password) \
.option("maxStrLength", "4000" ) \
.option("tempDir", "tempdirdetails") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
.option("dbTable", table_name) \
.save()