如何使用 JDBC source 在 (Pyspark?

How to use JDBC source to write and read data in (Py)Spark?

这个问题的目标是记录:

稍作改动,这些方法应该可以与其他受支持的语言一起使用,包括 Scala 和 R。

写入数据

  1. 在提交申请或启动 shell 时包括适用的 JDBC 驱动程序。您可以使用例如 --packages:

     bin/pyspark --packages group:name:version  
    

或结合driver-class-pathjars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

这些属性也可以在 JVM 实例启动之前使用 PYSPARK_SUBMIT_ARGS 环境变量设置,或者使用 conf/spark-defaults.conf 设置 spark.jars.packagesspark.jars / spark.driver.extraClassPath.

  1. 选择需要的模式。 Spark JDBC writer 支持以下模式:

    • append: Append contents of this :class:DataFrame to existing data.
    • overwrite: Overwrite existing data.
    • ignore: Silently ignore this operation if data already exists.
    • error (default case): Throw an exception if data already exists.

    更新或其他细粒度修改

     mode = ...
    
  2. 准备JDBC URI,例如:

     # You can encode credentials in URI or pass
     # separately using properties argument
     # of jdbc method or options
    
     url = "jdbc:postgresql://localhost/foobar"
    
  3. (可选)创建包含 JDBC 个参数的字典。

     properties = {
         "user": "foo",
         "password": "bar"
     }
    

    properties / options也可以用来设置supported JDBC connection properties.

  4. 使用DataFrame.write.jdbc

     df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

保存数据(详见pyspark.sql.DataFrameWriter)。

已知问题:

  • 当使用 --packages (java.sql.SQLException: No suitable driver found for jdbc: ...)

    包含驱动程序时,找不到合适的驱动程序

    假设没有驱动程序版本不匹配来解决这个问题,您可以将 driver class 添加到 properties。例如:

      properties = {
          ...
          "driver": "org.postgresql.Driver"
      }
    
  • 使用df.write.format("jdbc").options(...).save()可能导致:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.

    解决方案未知。

  • 在 Pyspark 1.3 中你可以尝试直接调用 Java 方法:

      df._jdf.insertIntoJDBC(url, "baz", True)
    

正在读取数据

  1. 按照写入数据

    中的步骤 1-4
  2. 使用sqlContext.read.jdbc:

     sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())

已知问题和陷阱:

  • 找不到合适的驱动程序 - 请参阅:写入数据

  • Spark SQL 支持使用 JDBC 源的谓词下推,尽管并非所有谓词都可以下推。它也不委托限制或聚合。可能的解决方法是用有效的子查询替换 dbtable / table 参数。参见示例:

  • 默认情况下 JDBC 数据源使用单个执行程序线程顺序加载数据。为确保分布式数据加载,您可以:

    • 提供分区column(必须是IntegerType),lowerBoundupperBoundnumPartitions
    • 提供一个互斥谓词列表predicates,每个所需的分区一个。

    参见:

    • ,
    • ,
    • How to improve performance for slow Spark jobs using DataFrame and JDBC connection?
  • 在分布式模式(带有分区列或谓词)中,每个执行程序都在自己的事务中运行。如果同时修改源数据库,则无法保证最终视图一致。

在哪里可以找到合适的驱动程序:

其他选项

根据数据库的不同,可能存在专门的来源,并且在某些情况下是首选:

下载mysql-connector-java驱动并保存在spark jar文件夹中,观察下面的python代码将数据写入"acotr1",我们必须在 mysql 数据库

中创建 acotr1 table 结构
    spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()

    sc = spark.sparkContext

    from pyspark.sql import SQLContext

    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()

    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"

    df.write.jdbc(mysql_url,table="actor1",mode="append")

参考这个 link 下载 postgres 的 jdbc 并按照步骤下载 jar 文件

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar 文件将在这样的路径中下载。 "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"

如果你的spark版本是2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

将文件保存为 python 和 运行 "python respectivefilename.py"