Spark DataFrame InsertIntoJDBC - TableAlreadyExists 异常
Spark DataFrame InsertIntoJDBC - TableAlreadyExists Exception
使用 Spark 1.4.0,我尝试使用 insertIntoJdbc() 将 Spark DataFrame 中的数据插入到 MemSQL 数据库中(这应该与 MySQL 数据库完全相同)。但是我不断收到运行时 TableAlreadyExists 异常。
首先,我像这样创建 MemSQL table:
CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);
然后我在 Spark 中创建一个简单的数据框并尝试像这样插入到 MemSQL 中:
val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]
df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)
java.lang.RuntimeException: Table table1 already exists.
insertIntoJDBC 文档实际上是不正确的;他们说 table 必须已经存在,但实际上如果它存在,它会抛出一个错误,正如你在上面看到的:
我们推荐使用我们的 MemSQL Spark 连接器,您可以在这里找到它:
https://github.com/memsql/memsql-spark-connector
如果您在代码中包含该库并导入 com.memsql.spark.connector._,则可以使用 df.saveToMemSQL(...) 将 DataFrame 保存到 MemSQL。您可以在此处找到我们连接器的文档:
此解决方案适用于一般 JDBC 连接,尽管@wayne 的回答可能是专门针对 memSQL 的更好解决方案。
insertIntoJdbc 从 1.4.0 开始似乎已被弃用,使用它实际上调用 write.jdbc()。
write() returns 一个 DataFrameWriter 对象。如果您想将数据附加到 table,则必须将对象的保存模式更改为 "append"
。
上述问题示例的另一个问题是 DataFrame 架构与目标 table 的架构不匹配。
下面的代码给出了一个来自 Spark shell 的工作示例。我正在使用 spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar
开始我的 spark-shell 会话。
import java.util.Properties
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "")
val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")
val dfWriter = df.write.mode("append")
dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop)
我有同样的问题。将 spark 版本更新到 1.6.2 工作正常
使用 Spark 1.4.0,我尝试使用 insertIntoJdbc() 将 Spark DataFrame 中的数据插入到 MemSQL 数据库中(这应该与 MySQL 数据库完全相同)。但是我不断收到运行时 TableAlreadyExists 异常。
首先,我像这样创建 MemSQL table:
CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);
然后我在 Spark 中创建一个简单的数据框并尝试像这样插入到 MemSQL 中:
val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]
df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)
java.lang.RuntimeException: Table table1 already exists.
insertIntoJDBC 文档实际上是不正确的;他们说 table 必须已经存在,但实际上如果它存在,它会抛出一个错误,正如你在上面看到的:
我们推荐使用我们的 MemSQL Spark 连接器,您可以在这里找到它:
https://github.com/memsql/memsql-spark-connector
如果您在代码中包含该库并导入 com.memsql.spark.connector._,则可以使用 df.saveToMemSQL(...) 将 DataFrame 保存到 MemSQL。您可以在此处找到我们连接器的文档:
此解决方案适用于一般 JDBC 连接,尽管@wayne 的回答可能是专门针对 memSQL 的更好解决方案。
insertIntoJdbc 从 1.4.0 开始似乎已被弃用,使用它实际上调用 write.jdbc()。
write() returns 一个 DataFrameWriter 对象。如果您想将数据附加到 table,则必须将对象的保存模式更改为 "append"
。
上述问题示例的另一个问题是 DataFrame 架构与目标 table 的架构不匹配。
下面的代码给出了一个来自 Spark shell 的工作示例。我正在使用 spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar
开始我的 spark-shell 会话。
import java.util.Properties
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "")
val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")
val dfWriter = df.write.mode("append")
dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop)
我有同样的问题。将 spark 版本更新到 1.6.2 工作正常