使用自动定义的模式将 SparkSQL 数据框加载到 Postgres 数据库中

Load SparkSQL dataframe into Postgres database with automatically defined schema

我目前正在尝试将 Parquet 文件加载到 Postgres 数据库中。 Parquet 文件已经定义了模式,我希望将该模式转移到 Postgres table.

我没有在 Postgres 中定义任何架构或 table。但我希望加载过程在读取时自动推断模式并创建一个 table,然后将 SparkSQL 数据帧加载到该 table.

这是我的代码:

import findspark
findspark.init()

from pyspark.sql import SparkSession

appName = "load_parquet"
master = "local"

spark = SparkSession.builder \
        .master(master) \
        .appName(appName) \
        .getOrCreate()

将 Parquet 数据作为 Spark 数据帧读入

customers_sdf = spark.read.parquet('/home/jovyan/filesystem/customers.parquet')

检查架构是否正确

customers_sdf.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

将 SparkSQL 数据帧写入 Postgres

customers_sdf.write \
    .jdbc(
        url="jdbc:postgresql:destdb", 
        table="public.customers", 
        properties={"user": "destdb1", "password": "destdb1"}
    )

我的 Postgres 容器主机名是 postgres-dest,它的端口映射是 5434:5432。见下文:

  postgres-dest:
    image: postgres:latest
    environment:
      POSTGRES_USER: destdb1
      POSTGRES_PASSWORD: destdb1
      POSTGRES_DB: destdb
    logging:
      options:
        max-size: 10m
        max-file: "3"
    ports:
      - "5434:5432"
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "destdb1"]
      interval: 5s
      retries: 5
    restart: always

  pyspark-notebook:
    build: .
    image: jupyter/pyspark-notebook:latest
    environment:
      JUPYTER_ENABLE_LAB: 'yes'
    ports:
      - "8889:8889"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work/notebooks
      - ./filesystem:/home/jovyan/filesystem

我尝试将数据帧写入 Postgres,如前所示,但出现此错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/tmp/ipykernel_101/2914557055.py in <module>
----> 1 customers_sdf.write \
      2     .jdbc(url="jdbc:postgresql://postgres-dest/destdb", table="public.customers", properties={"user": "destdb1", "password": "destdb1"})

/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, mode, properties)
   1443         for k in properties:
   1444             jprop.setProperty(k, properties[k])
-> 1445         self.mode(mode)._jwrite.jdbc(url, table, jprop)
   1446 
   1447 

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o231.jdbc.
: java.sql.SQLException: No suitable driver
    at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass(JDBCOptions.scala:108)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:817)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:829)

注意:我是 Spark 的绝对初学者,所以请像我 5 岁一样解释。

url 更改为 jdbc:postgresql://postgres-dest:5432/destdb

并确保 PostgreSQL 驱动程序 jar 存在于类路径中。 您可以从 here.

下载 jar