将 PySpark DataFrame 写入 MySQL 的最佳实践

Best Practice when Writing PySpark DataFrames to MySQL

我正在尝试使用 Apache Airflow 和计划的 Spark 作业开发一些数据管道。

对于这些管道之一,我正在尝试将数据从 PySpark DataFrame 写入 MySQL 并且我将 运行 放入一些问题中。这就是我的代码目前的样子,但我确实想在未来对此添加更多转换,

df_tsv = spark.read.csv(tsv_file, sep=r'\t', header=True)
df_tsv.write.jdbc(url=mysql_url, table=mysql_table, mode="append", properties={"user":mysql_user, "password": mysql_password, "driver": "com.mysql.cj.jdbc.Driver" })

这是不断引发的异常,

java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

我想知道的第一件事是如何解决上述问题。

其次,我想知道将数据从 Spark 写入 MySQL 等数据库时的最佳实践是什么。例如,是否有一个选项可以使 DataFrame 中给定列的数据存储在 table 中的指定列中?或者 table 的列名应该与 DataFrame 的列名相同吗?

我在这里想到的另一个选择是将 DataFrame 转换为一个元组列表,然后使用 mysql-python-connector 之类的东西将数据加载到数据库,

rdd = df.rdd
b = rdd.map(tuple)
data = b.collect()

# write data to database using mysql-python-connector

这里更有效的选择是什么?还有其他我不知道的选择吗?

java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

The first thing that I want to know is how I can solve the above issue.

启动 Spark 会话时需要传递 JDBC 连接器 https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Secondly, I would like to know what the best practice is when writing data from Spark to databases like MySQL. For instance, is there an option to make it so that data from a given column in the DataFrame is stored in a specified column in the table? Or should the column names of the table be the same as those of the DataFrame?

是的,数据框列名将与 table 列名匹配。

The other option that I can think of here is to convert the DataFrame to say, a list of tuples and then use something like the mysql-python-connector to load the data into the database,

rdd = df.rdd

b = rdd.map(tuple)

data = b.collect()

# write data to database using mysql-python-connector

不,永远不会这样做,这将破坏使用 Spark(分布式计算)的所有目的。查看上面的 link,您会发现一些关于从哪里开始以及如何 read/write from/to JDBC 数据源的好建议。