Azure Synapse Analytics 无法执行连接器生成的 JDBC 查询与 Apache Spark 上的 Databricks

Azure Synapse Analytics failed to execute the JDBC query produced by the connector with Databricks on Apache Spark

我正在尝试从 Databricks 写入我的 Azure Synapse 服务器,但我不断收到错误消息:

Azure Synapse Analytics 无法执行连接器生成的 JDBC 查询

代码如下:

blobStorage = "*******.blob.core.windows.net"
blobContainer = "synapsestagecontainer"
blobAccessKey = "***************"

tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"

acntInfo = "fs.azure.account.key."+ blobStorage
sc._jsc.hadoopConfiguration().set(acntInfo, blobAccessKey)

dwDatabase = "carlspool"
dwServer = "carlssynapseworkspace"
dwUser = "techadmin@carlssynapseworkspace"
dwPass = "*******"
dwJdbcPort = "1433"
dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://" + dwServer + ".database.windows.net:" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ".database.windows.net:" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass


spark.conf.set(
   "spark.sql.parquet.writeLegacyFormat",
   "true")

example1.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable12").option("forward_spark_azure_storage_credentials","True") .option("tempdir", tempDir).mode("overwrite").save()

完整的堆栈跟踪如下:

Py4JJavaError                             Traceback (most recent call last)
<command-3898875195714724> in <module>
      4    "true")
      5 
----> 6 example1.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable12").option("forward_spark_azure_storage_credentials","True") .option("tempdir", tempDir).mode("overwrite").save()

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
   1132             self.format(format)
   1133         if path is None:
-> 1134             self._jwrite.save()
   1135         else:
   1136             self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115     def deco(*a, **kw):
    116         try:
--> 117             return f(*a, **kw)
    118         except py4j.protocol.Py4JJavaError as e:
    119             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o1761.save.
: com.databricks.spark.sqldw.SqlDWSideException: Azure Synapse Analytics failed to execute the JDBC query produced by the connector.
Underlying SQLException(s):
  - com.microsoft.sqlserver.jdbc.SQLServerException: HdfsBridge::recordReaderFillBuffer - Unexpected error encountered filling record reader buffer: HadoopSqlException: String or binary data would be truncated. [ErrorCode = 107090] [SQLState = S0001]
         
    at com.databricks.spark.sqldw.Utils$.wrapExceptions(Utils.scala:686)
    at com.databricks.spark.sqldw.DefaultSource.createRelation(DefaultSource.scala:89)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:196)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:167)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:166)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:1079)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv(SQLExecution.scala:126)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1079)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:468)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:311)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Exception thrown in awaitResult: 
    at com.databricks.spark.sqldw.JDBCWrapper.executeInterruptibly(SqlDWJDBCWrapper.scala:137)
    at com.databricks.spark.sqldw.JDBCWrapper.$anonfun$executeInterruptibly(SqlDWJDBCWrapper.scala:115)
    at com.databricks.spark.sqldw.JDBCWrapper.$anonfun$executeInterruptibly$adapted(SqlDWJDBCWrapper.scala:115)
    at com.databricks.spark.sqldw.JDBCWrapper.withPreparedStatement(SqlDWJDBCWrapper.scala:362)
    at com.databricks.spark.sqldw.JDBCWrapper.executeInterruptibly(SqlDWJDBCWrapper.scala:115)
    at com.databricks.spark.sqldw.SqlDwWriter.$anonfun$saveToSqlDW(SqlDwWriter.scala:239)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:377)
    at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:363)
    at com.databricks.spark.util.SparkDatabricksProgressReporter$.withStatusCode(ProgressReporter.scala:34)
    at com.databricks.spark.sqldw.SqlDwWriter.$anonfun$saveToSqlDW(SqlDwWriter.scala:197)
    at com.databricks.spark.sqldw.SqlDwWriter.$anonfun$saveToSqlDW$adapted(SqlDwWriter.scala:73)
    at com.databricks.spark.sqldw.JDBCWrapper.withConnection(SqlDWJDBCWrapper.scala:340)
    at com.databricks.spark.sqldw.SqlDwWriter.saveToSqlDW(SqlDwWriter.scala:73)
    at com.databricks.spark.sqldw.DefaultSource.$anonfun$createRelation(DefaultSource.scala:122)
    at com.databricks.spark.sqldw.Utils$.wrapExceptions(Utils.scala:655)
    ... 34 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: HdfsBridge::recordReaderFillBuffer - Unexpected error encountered filling record reader buffer: HadoopSqlException: String or binary data would be truncated.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1632)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:602)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:524)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7418)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3272)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:247)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:222)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.execute(SQLServerPreparedStatement.java:505)
    at com.databricks.spark.sqldw.JDBCWrapper.$anonfun$executeInterruptibly(SqlDWJDBCWrapper.scala:115)
    at com.databricks.spark.sqldw.JDBCWrapper.$anonfun$executeInterruptibly$adapted(SqlDWJDBCWrapper.scala:115)
    at com.databricks.spark.sqldw.JDBCWrapper.$anonfun$executeInterruptibly(SqlDWJDBCWrapper.scala:129)
    at scala.concurrent.Future$.$anonfun$apply(Future.scala:659)
    at scala.util.Success.$anonfun$map(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

我知道还有其他人在使用 Databricks 时遇到过这个问题,我已经尝试将答案应用于我的情况,但我无法让它工作。

完整的错误是:

com.databricks.spark.sqldw.SqlDWSideException: Azure Synapse Analytics failed to execute the JDBC query produced by the connector.

我是运行运行时8.3

我已经为同样的错误苦苦挣扎了几天,直到我得到下面的代码。我还在我的突触专用池中创建了一个秘密范围、外部数据源和外部文件格式

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

df_gold = spark.read.format('delta').load('dbfs:/mnt/datalake/gold')

df = df_gold.select('faceId', 'name')

blobStorage  = "<your storage name>.blob.core.windows.net"
blobContainer  = "<your container name>"
blobAccessKey = "<your storage key>"

tempDir = "wasbs://" + blobContainer  + "@" + blobStorage  +"/tempDirs"

acntInfo = "fs.azure.account.key."+ blobStorage

sc._jsc.hadoopConfiguration().set(acntInfo, blobAccessKey)

dwDatabase = "<your pool name>"
dwServer = "<your workspace name>.database.windows.net"
dwUser = "user"
dwPass = "pass"
dwJdbcPort =  "1433"
dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass

spark.conf.set(
    "spark.sql.parquet.writeLegacyFormat",
    "true")

(df
 .write
 .format("com.databricks.spark.sqldw")
 .option("url", sqlDwUrlSmall)
 .option("dbtable", "SampleTable")
 .option( "forward_spark_azure_storage_credentials","True")
 .option("tempdir", tempDir)
 .mode("overwrite")
 .save())