Pyspark:从 table 读取数据并写入文件
Pyspark: Read data from table and write to File
我正在使用 HDInsight spark 集群 运行 我的 Pyspark 代码。我正在尝试从 postgres table 读取数据并写入如下文件。
pgsql_df 正在返回 DataFrameReader 而不是 DataFrame。所以我无法将 DataFrame 写入文件。
为什么 "spark.read" 返回 DataFrameReader。我在这里错过了什么?
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as dbpull
from datetime import datetime
from pyspark.sql.types import Row
from pyspark.sql import DataFrame
from pyspark.sql import DataFrameReader
from pyspark.sql import DataFrameWriter
import random
import string
from pyspark.sql.functions import *
import sys
spark=SparkSession.builder.master("local").appName("db pull").getOrCreate()
pgsql_df=spark.read.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql://<hostdetails>") \
.option("dbtable", "table") \
.option("user", "user") \
.option("password", "password")```
>>>pgsql_df
<pyspark.sql.readwriter.DataFrameReader object at 0x7fb43ce1f890>
pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)
**Error:**
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'DataFrameReader' object has no attribute 'write'
请检查以下代码。您缺少对 DataFrameReader 对象调用 load() 的方法。
pgsql_df=spark.read.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql://<hostdetails>") \
.option("dbtable", "table") \
.option("user", "user") \
.option("password", "password")
.load() // this is missing
pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)
or
pgsql_df=spark.read.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql://<hostdetails>") \
.option("dbtable", "table") \
.option("user", "user") \
.option("password", "password")
pgsql_df
.load() \ added here
.write. \
.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)
我正在使用 HDInsight spark 集群 运行 我的 Pyspark 代码。我正在尝试从 postgres table 读取数据并写入如下文件。 pgsql_df 正在返回 DataFrameReader 而不是 DataFrame。所以我无法将 DataFrame 写入文件。 为什么 "spark.read" 返回 DataFrameReader。我在这里错过了什么?
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as dbpull
from datetime import datetime
from pyspark.sql.types import Row
from pyspark.sql import DataFrame
from pyspark.sql import DataFrameReader
from pyspark.sql import DataFrameWriter
import random
import string
from pyspark.sql.functions import *
import sys
spark=SparkSession.builder.master("local").appName("db pull").getOrCreate()
pgsql_df=spark.read.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql://<hostdetails>") \
.option("dbtable", "table") \
.option("user", "user") \
.option("password", "password")```
>>>pgsql_df
<pyspark.sql.readwriter.DataFrameReader object at 0x7fb43ce1f890>
pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)
**Error:**
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'DataFrameReader' object has no attribute 'write'
请检查以下代码。您缺少对 DataFrameReader 对象调用 load() 的方法。
pgsql_df=spark.read.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql://<hostdetails>") \
.option("dbtable", "table") \
.option("user", "user") \
.option("password", "password")
.load() // this is missing
pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)
or
pgsql_df=spark.read.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql://<hostdetails>") \
.option("dbtable", "table") \
.option("user", "user") \
.option("password", "password")
pgsql_df
.load() \ added here
.write. \
.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)