在 PySpark 中将 csv 加载到 DataFrame 时出现问题
Problem loading csv into DataFrame in PySpark
我正在尝试将一堆 CSV 文件聚合成一个文件,并使用 AWS Glue 中的 ETL 作业将其以 ORC 格式输出到 S3。我的汇总 CSV 如下所示:
header1,header2,header3
foo1,foo2,foo3
bar1,bar2,bar3
我有一个名为 aggregated_csv
的聚合 CSV 的字符串表示,其内容为 header1,header2,header3\nfoo1,foo2,foo3\nbar1,bar2,bar3
。
我读过 pyspark 有一种直接的方法可以将 CSV 文件转换为数据帧(我需要它以便我可以利用 Glue 的能力在 ORC 中轻松输出)。这是我尝试过的片段:
def f(glueContext, aggregated_csv, schema):
with open('somefile', 'a+') as agg_file:
agg_file.write(aggregated_csv)
#agg_file.seek(0)
df = glueContext.read.csv(agg_file, schema=schema, header="true")
df.show()
无论有没有搜索,我都试过了。当我不调用 seek() 时,作业成功完成,但 df.show()
不显示 headers 以外的任何数据。当我调用 seek() 时,出现以下异常:
pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://ip-172-31-48-255.us-west-2.compute.internal:8020/user/root/header1,header2,header3\n;'
由于 seek 似乎改变了行为,并且由于我的 csv 中的 headers 是异常字符串的一部分,所以我假设问题与我通过文件到 glueContext.read.csv()
但我不确定如何解决它。如果我取消注释 seek(0)
调用并添加 agg_file.read()
命令,我可以按预期看到文件的全部内容。我需要更改什么才能成功读取刚刚写入 spark 数据帧的 csv 文件?
我认为您向 csv
函数传递了错误的参数。我相信 GlueContext.read.csv()
将获得 DataFrameReader.csv()
的一个实例,并且它的签名将文件名作为第一个参数,而您传递的是一个类文件对象。
def f(glueContext, aggregated_csv, schema):
with open('somefile', 'a+') as agg_file:
agg_file.write(aggregated_csv)
#agg_file.seek(0)
df = glueContext.read.csv('somefile', schema=schema, header="true")
df.show()
但是,如果你想让它写一个 ORC 文件,并且你已经将数据读取为 aggregated_csv
,你可以直接从元组列表中创建一个 DataFrame
。
df = spark.createDataFrame([('foo1','foo2','foo3'), ('bar1','bar2','bar3')], ['header1', 'header2', 'header3'])
然后,如果你需要 Glue DynamicFrame
使用 fromDF 功能
dynF = fromDF(df, glueContext, 'myFrame')
还有一点:你不需要胶水来编写 ORC - 激发它完全有能力。只需使用 DataFrameWriter.orc()
函数:
df.write.orc('s3://path')
我正在尝试将一堆 CSV 文件聚合成一个文件,并使用 AWS Glue 中的 ETL 作业将其以 ORC 格式输出到 S3。我的汇总 CSV 如下所示:
header1,header2,header3
foo1,foo2,foo3
bar1,bar2,bar3
我有一个名为 aggregated_csv
的聚合 CSV 的字符串表示,其内容为 header1,header2,header3\nfoo1,foo2,foo3\nbar1,bar2,bar3
。
我读过 pyspark 有一种直接的方法可以将 CSV 文件转换为数据帧(我需要它以便我可以利用 Glue 的能力在 ORC 中轻松输出)。这是我尝试过的片段:
def f(glueContext, aggregated_csv, schema):
with open('somefile', 'a+') as agg_file:
agg_file.write(aggregated_csv)
#agg_file.seek(0)
df = glueContext.read.csv(agg_file, schema=schema, header="true")
df.show()
无论有没有搜索,我都试过了。当我不调用 seek() 时,作业成功完成,但 df.show()
不显示 headers 以外的任何数据。当我调用 seek() 时,出现以下异常:
pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://ip-172-31-48-255.us-west-2.compute.internal:8020/user/root/header1,header2,header3\n;'
由于 seek 似乎改变了行为,并且由于我的 csv 中的 headers 是异常字符串的一部分,所以我假设问题与我通过文件到 glueContext.read.csv()
但我不确定如何解决它。如果我取消注释 seek(0)
调用并添加 agg_file.read()
命令,我可以按预期看到文件的全部内容。我需要更改什么才能成功读取刚刚写入 spark 数据帧的 csv 文件?
我认为您向 csv
函数传递了错误的参数。我相信 GlueContext.read.csv()
将获得 DataFrameReader.csv()
的一个实例,并且它的签名将文件名作为第一个参数,而您传递的是一个类文件对象。
def f(glueContext, aggregated_csv, schema):
with open('somefile', 'a+') as agg_file:
agg_file.write(aggregated_csv)
#agg_file.seek(0)
df = glueContext.read.csv('somefile', schema=schema, header="true")
df.show()
但是,如果你想让它写一个 ORC 文件,并且你已经将数据读取为 aggregated_csv
,你可以直接从元组列表中创建一个 DataFrame
。
df = spark.createDataFrame([('foo1','foo2','foo3'), ('bar1','bar2','bar3')], ['header1', 'header2', 'header3'])
然后,如果你需要 Glue DynamicFrame
使用 fromDF 功能
dynF = fromDF(df, glueContext, 'myFrame')
还有一点:你不需要胶水来编写 ORC - 激发它完全有能力。只需使用 DataFrameWriter.orc()
函数:
df.write.orc('s3://path')