已经有一个名为“”的对象;在数据库中

There is already an object named ""; in the database

我在数据块中使用 pyspark 通过 ADF 管道将数据附加到 sql table。代码如下:

log_status_df_all = spark.createDataFrame(log_status_df_all)
log_status_df_all.write.format("com.microsoft.sqlserver.jdbc.spark").mode(write_mode).option("url", url).option("dbtable", 'Logs_Collection_Status').option("user", username).option("password", password).save()
log_status_df_all.show() 

有时我会收到错误消息:

com.microsoft.sqlserver.jdbc.SQLServerException: There is already an object named &#39;<table_name>&#39; in the database.

只需重新 运行 管道,table 就可以毫无问题地更新;因此代码正在运行。我怎样才能防止这种情况再次发生?多个管道同时尝试写入同一个 table 是否出错?

错误消息的其余部分如下所示:

---------------------------------------------------------------------------  
Py4JJavaError                             Traceback (most recent call last) 
<command-3143827225825384> in <module>

      8 
      9   log_collection_df = spark.createDataFrame(log_collection_df)
---> 10   write_df_sql(log_collection_df, 'Logs_Collection_Status', 'overwrite')
     11 

<command-1421348210166948> in write_df_sql(df, table, write_mode)
     14 
     15 
---> 16   spark_df.write.format("com.microsoft.sqlserver.jdbc.spark").mode(write_mode).option("url", url).option("dbtable", table_name).option("user", username).option("password", password).save()
     17 
     18   #backup table

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    735             self.format(format)
    736         if path is None:
--> 737             self._jwrite.save()
    738         else:
    739             self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)    1255         answer = self.gateway_client.send_command(command)    1256         return_value
= get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)    1258     1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o661.save. : com.microsoft.sqlserver.jdbc.SQLServerException: There is already an object named 'Logs_Collection_Status' in the database.     at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)   at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)  at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:845)  at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:752)   at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)  at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)   at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)  at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeUpdate(SQLServerStatement.java:680)   at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.executeUpdate(BulkCopyUtils.scala:456)     at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.mssqlCreateTable(BulkCopyUtils.scala:495)  at com.microsoft.sqlserver.jdbc.spark.SingleInstanceConnector$.createTable(SingleInstanceConnector.scala:33)    at com.microsoft.sqlserver.jdbc.spark.Connector.write(Connector.scala:60)   at com.microsoft.sqlserver.jdbc.spark.DefaultSource.createRelation(DefaultSource.scala:51)  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)   at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:152)   at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:140)   at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:193)  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189)   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:140)    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:117)     at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:711)  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:711)  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv.apply(SQLExecution.scala:113)  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:243)   at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:99)   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:173)  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:711)   at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:307)   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)     at sun.reflect.GeneratedMethodAccessor841.invoke(Unknown Source)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)   at py4j.Gateway.invoke(Gateway.java:295)    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)     at py4j.commands.CallCommand.execute(CallCommand.java:79)   at py4j.GatewayConnection.run(GatewayConnection.java:251)   at java.lang.Thread.run(Thread.java:748)

问题现已解决。

前一个代码:

if condition1==True: 
     write_mode = 'overwrite' 
elif condition2==True:
      write_mode = 'append'


log_status_df_all = spark.createDataFrame(log_status_df_all)
log_status_df_all.write.format("com.microsoft.sqlserver.jdbc.spark").mode(write_mode).option("url", url).option("dbtable", 'Logs_Collection_Status').option("user", username).option("password", password).save()  
log_status_df_all.show()

当前代码:

write_mode = 'append'

log_status_df_all = spark.createDataFrame(log_status_df_all)
log_status_df_all.write.format("com.microsoft.sqlserver.jdbc.spark").mode(write_mode).option("url", url).option("dbtable", 'Logs_Collection_Status').option("user", username).option("password", password).save()  
log_status_df_all.show()