从 Pyspark 2.4 连接 SQL 服务器写入数据时出现问题

Problem connecting SQL Server from Pyspark 2.4 to write data

我正在使用 Pyspark 2.4,想将数据写入 SQL 服务器,但无法正常工作。

我将从here下载的jar文件放在spark路径中:

D:\spark-2.4.3-bin-hadoop2.7\spark-2.4.3-bin-hadoop2.7\jars\

但是,无济于事。以下是将数据写入 SQL 服务器的 pyspark 代码。

sql_server_dtls = {'user': 'john', 'password': 'doe'}

ports_budget_joined_DF.write.jdbc(url="jdbc:sqlserver://endpoint:1433;databaseName=poc", table='dbo.test_tmp', mode='overwrite', properties=sql_server_dtls)

出现以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\sql\readwriter.py", line 982, in jdbc
    self.mode(mode)._jwrite.jdbc(url, table, jprop)
  File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
  File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o45.jdbc.
: java.sql.SQLException: No suitable driver

我是不是错过了什么?此外,我想先截断 table,然后再将新数据写入其中 。 DF 编写器中的 mode='overwrite' 是否也对 SQL 服务器目标系统进行相同处理?

您只需要 com.mysql.cj.jdbc.Driver,Spark 可以自动将其下载到它正在寻找的任何目录中。

使用这个功能:

def connect_to_sql(
    spark, jdbc_hostname, jdbc_port, database, data_table, username, password
):

    jdbc_url = "jdbc:mysql://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database)

    connection_details = {
        "user": username,
        "password": password,
        "driver": "com.mysql.cj.jdbc.Driver",
    }

    df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
    return df

添加:

您可以使用以下功能(您可以根据自己的需要对其进行编辑)在声明您的 sparkSession() 时传递包。您可以在列表中传递包的工件 ID,或作为逗号分隔的字符串。您可以从 The central repository

获取它们
def create_spark_session(master_url, packages=None):
    """
    Creates a spark session
    :param master_url: IP address of the cluster you want to submit the job to or local with all cores
    :param packages: Any external packages if needed, only when called. This variable could be a string of the package
        specification or a list of package specifications.
    :return: spark session object
    """
    if packages:
        packages = ",".join(packages) if isinstance(packages, list) else packages
        spark = (
            SparkSession.builder.master(master_url)
            .config("spark.io.compression.codec", "snappy")
            .config("spark.ui.enabled", "false")
            .config("spark.jars.packages", packages)
            .getOrCreate()
        )
    else:
        spark = (
            SparkSession.builder.master(master_url)
            .config("spark.io.compression.codec", "snappy")
            .config("spark.ui.enabled", "false")
            .getOrCreate()
        )

    return spark