Spark DataFrame InsertIntoJDBC - TableAlreadyExists 异常

Spark DataFrame InsertIntoJDBC - TableAlreadyExists Exception

使用 Spark 1.4.0,我尝试使用 insertIntoJdbc() 将 Spark DataFrame 中的数据插入到 MemSQL 数据库中(这应该与 MySQL 数据库完全相同)。但是我不断收到运行时 TableAlreadyExists 异常。

首先,我像这样创建 MemSQL table:

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);

然后我在 Spark 中创建一个简单的数据框并尝试像这样插入到 MemSQL 中:

val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)

java.lang.RuntimeException: Table table1 already exists.

insertIntoJDBC 文档实际上是不正确的;他们说 table 必须已经存在,但实际上如果它存在,它会抛出一个错误,正如你在上面看到的:

https://github.com/apache/spark/blob/03cca5dce2cd7618b5c0e33163efb8502415b06e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L264

我们推荐使用我们的 MemSQL Spark 连接器,您可以在这里找到它:

https://github.com/memsql/memsql-spark-connector

如果您在代码中包含该库并导入 com.memsql.spark.connector._,则可以使用 df.saveToMemSQL(...) 将 DataFrame 保存到 MemSQL。您可以在此处找到我们连接器的文档:

http://memsql.github.io/memsql-spark-connector/latest/api/#com.memsql.spark.connector.DataFrameFunctions

此解决方案适用于一般 JDBC 连接,尽管@wayne 的回答可能是专门针对 memSQL 的更好解决方案。

insertIntoJdbc 从 1.4.0 开始似乎已被弃用,使用它实际上调用 write.jdbc()。

write() returns 一个 DataFrameWriter 对象。如果您想将数据附加到 table,则必须将对象的保存模式更改为 "append"

上述问题示例的另一个问题是 DataFrame 架构与目标 table 的架构不匹配。

下面的代码给出了一个来自 Spark shell 的工作示例。我正在使用 spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar 开始我的 spark-shell 会话。

import java.util.Properties

val prop = new Properties() 
prop.put("user", "root")
prop.put("password", "")  

val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")   
val dfWriter = df.write.mode("append") 

dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop) 

我有同样的问题。将 spark 版本更新到 1.6.2 工作正常