将 Databricks 中的 spark 数据帧写入 Azure Synapse 时出错

Error when write spark dataframe from Databricks into Azure Synapse

我正在尝试将 spark 数据帧写入 Azure Syanpse 数据库。

我的代码:

try:
  re_spdf.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .option("encrypt", 'True') \
    .option("trustServerCertificate", 'false') \
    .option("hostNameInCertificate", '*.database.windows.net') \
    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
    .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver')\
    .save()
except ValueError as error :
    print("Connector write failed", error)

错误信息:

org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 1 in stage 29.0 failed 4 times, most recent failure: 
Lost task 1.3 in stage 29.0 (TID 885, 10.139.64.8, executor 0):
com.microsoft.sqlserver.jdbc.SQLServerException:
PdwManagedToNativeInteropException ErrorNumber: 46724, MajorCode: 467, 
MinorCode: 24, Severity: 20, State: 2, Exception of type 
'Microsoft.SqlServer.DataWarehouse.Tds.PdwManagedToNativeInteropException' was thrown.

我什至用谷歌搜索了这条错误消息。我没有得到任何有用的解决方案。

更新:我的工作环境是Databricks pyspark notebook。

如有任何建议,我们将不胜感激。

Azure databricks documentation 表示格式 com.databricks.spark.sqldw 到 read/write 数据 from/to 来自 Azure Synapse table.

的数据

如果您使用的是 Synapse,为什么不用 Synapse notebook,然后编写数据框就像调用 synapsesql 一样简单,例如

%%spark
df.write.synapsesql("yourPool.dbo.someXMLTable_processed", Constants.INTERNAL)

你会省去一些麻烦,而且性能应该很好,因为它是并行化的。这是主要文章:

https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/synapse-spark-sql-pool-import-export

突触数据库中存在一些列长度限制 table。它将只允许 4000 个字符。

所以当我使用 com.databricks.spark.sqldw 时,因为它使用 Polybase 作为连接器,我还需要更改 DB table 中列的长度。 参考:https://forums.databricks.com/questions/21032/databricks-throwing-error-sql-dw-failed-to-execute.html

代码:

df.write \
  .format("com.databricks.spark.sqldw") \
  .mode("append") \
  .option("url", url) \
  .option("user", username) \
  .option("password", password) \
  .option("maxStrLength", "4000" ) \
  .option("tempDir", "tempdirdetails") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
  .option("dbTable", table_name) \
  .save()