使用自动定义的模式将 SparkSQL 数据框加载到 Postgres 数据库中
Load SparkSQL dataframe into Postgres database with automatically defined schema
我目前正在尝试将 Parquet 文件加载到 Postgres 数据库中。 Parquet 文件已经定义了模式,我希望将该模式转移到 Postgres table.
我没有在 Postgres 中定义任何架构或 table。但我希望加载过程在读取时自动推断模式并创建一个 table,然后将 SparkSQL 数据帧加载到该 table.
这是我的代码:
import findspark
findspark.init()
from pyspark.sql import SparkSession
appName = "load_parquet"
master = "local"
spark = SparkSession.builder \
.master(master) \
.appName(appName) \
.getOrCreate()
将 Parquet 数据作为 Spark 数据帧读入
customers_sdf = spark.read.parquet('/home/jovyan/filesystem/customers.parquet')
检查架构是否正确
customers_sdf.printSchema()
root
|-- customer_id: string (nullable = true)
|-- customer_unique_id: string (nullable = true)
|-- customer_zip_code_prefix: string (nullable = true)
|-- customer_city: string (nullable = true)
|-- customer_state: string (nullable = true)
将 SparkSQL 数据帧写入 Postgres
customers_sdf.write \
.jdbc(
url="jdbc:postgresql:destdb",
table="public.customers",
properties={"user": "destdb1", "password": "destdb1"}
)
我的 Postgres 容器主机名是 postgres-dest
,它的端口映射是 5434:5432
。见下文:
postgres-dest:
image: postgres:latest
environment:
POSTGRES_USER: destdb1
POSTGRES_PASSWORD: destdb1
POSTGRES_DB: destdb
logging:
options:
max-size: 10m
max-file: "3"
ports:
- "5434:5432"
healthcheck:
test: ["CMD", "pg_isready", "-U", "destdb1"]
interval: 5s
retries: 5
restart: always
pyspark-notebook:
build: .
image: jupyter/pyspark-notebook:latest
environment:
JUPYTER_ENABLE_LAB: 'yes'
ports:
- "8889:8889"
- "4040-4080:4040-4080"
volumes:
- ./notebooks:/home/jovyan/work/notebooks
- ./filesystem:/home/jovyan/filesystem
我尝试将数据帧写入 Postgres,如前所示,但出现此错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/tmp/ipykernel_101/2914557055.py in <module>
----> 1 customers_sdf.write \
2 .jdbc(url="jdbc:postgresql://postgres-dest/destdb", table="public.customers", properties={"user": "destdb1", "password": "destdb1"})
/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, mode, properties)
1443 for k in properties:
1444 jprop.setProperty(k, properties[k])
-> 1445 self.mode(mode)._jwrite.jdbc(url, table, jprop)
1446
1447
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o231.jdbc.
: java.sql.SQLException: No suitable driver
at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass(JDBCOptions.scala:108)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:817)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
注意:我是 Spark 的绝对初学者,所以请像我 5 岁一样解释。
将 url
更改为 jdbc:postgresql://postgres-dest:5432/destdb
。
并确保 PostgreSQL 驱动程序 jar 存在于类路径中。
您可以从 here.
下载 jar
我目前正在尝试将 Parquet 文件加载到 Postgres 数据库中。 Parquet 文件已经定义了模式,我希望将该模式转移到 Postgres table.
我没有在 Postgres 中定义任何架构或 table。但我希望加载过程在读取时自动推断模式并创建一个 table,然后将 SparkSQL 数据帧加载到该 table.
这是我的代码:
import findspark
findspark.init()
from pyspark.sql import SparkSession
appName = "load_parquet"
master = "local"
spark = SparkSession.builder \
.master(master) \
.appName(appName) \
.getOrCreate()
将 Parquet 数据作为 Spark 数据帧读入
customers_sdf = spark.read.parquet('/home/jovyan/filesystem/customers.parquet')
检查架构是否正确
customers_sdf.printSchema()
root
|-- customer_id: string (nullable = true)
|-- customer_unique_id: string (nullable = true)
|-- customer_zip_code_prefix: string (nullable = true)
|-- customer_city: string (nullable = true)
|-- customer_state: string (nullable = true)
将 SparkSQL 数据帧写入 Postgres
customers_sdf.write \
.jdbc(
url="jdbc:postgresql:destdb",
table="public.customers",
properties={"user": "destdb1", "password": "destdb1"}
)
我的 Postgres 容器主机名是 postgres-dest
,它的端口映射是 5434:5432
。见下文:
postgres-dest:
image: postgres:latest
environment:
POSTGRES_USER: destdb1
POSTGRES_PASSWORD: destdb1
POSTGRES_DB: destdb
logging:
options:
max-size: 10m
max-file: "3"
ports:
- "5434:5432"
healthcheck:
test: ["CMD", "pg_isready", "-U", "destdb1"]
interval: 5s
retries: 5
restart: always
pyspark-notebook:
build: .
image: jupyter/pyspark-notebook:latest
environment:
JUPYTER_ENABLE_LAB: 'yes'
ports:
- "8889:8889"
- "4040-4080:4040-4080"
volumes:
- ./notebooks:/home/jovyan/work/notebooks
- ./filesystem:/home/jovyan/filesystem
我尝试将数据帧写入 Postgres,如前所示,但出现此错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/tmp/ipykernel_101/2914557055.py in <module>
----> 1 customers_sdf.write \
2 .jdbc(url="jdbc:postgresql://postgres-dest/destdb", table="public.customers", properties={"user": "destdb1", "password": "destdb1"})
/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, mode, properties)
1443 for k in properties:
1444 jprop.setProperty(k, properties[k])
-> 1445 self.mode(mode)._jwrite.jdbc(url, table, jprop)
1446
1447
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o231.jdbc.
: java.sql.SQLException: No suitable driver
at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass(JDBCOptions.scala:108)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:817)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
注意:我是 Spark 的绝对初学者,所以请像我 5 岁一样解释。
将 url
更改为 jdbc:postgresql://postgres-dest:5432/destdb
。
并确保 PostgreSQL 驱动程序 jar 存在于类路径中。 您可以从 here.
下载 jar