如何使用 Pyspark 加载复杂数据
How to load complex data using Pyspark
我有一个 CSV dataset
如下所示:
此外,文本形式的 PFB 数据:
Timestamp,How old are you?,What industry do you work in?,Job title,What is your annual salary?,Please indicate the currency,Where are you located? (City/state/country),How many years of post-college professional work experience do you have?,"If your job title needs additional context, please clarify here:","If ""Other,"" please indicate the currency here: "
4/24/2019 11:43:21,35-44,Government,Talent Management Asst. Director,75000,USD,"Nashville, TN",11 - 20 years,,
4/24/2019 11:43:26,25-34,Environmental nonprofit,Operations Director,"65,000",USD,"Madison, Wi",8 - 10 years,,
4/24/2019 11:43:27,18-24,Market Research,Market Research Assistant,"36,330",USD,"Las Vegas, NV",2 - 4 years,,
4/24/2019 11:43:27,25-34,Biotechnology,Senior Scientist,34600,GBP,"Cardiff, UK",5-7 years,,
4/24/2019 11:43:29,25-34,Healthcare,Social worker (embedded in primary care),55000,USD,"Southeast Michigan, USA",5-7 years,,
4/24/2019 11:43:29,25-34,Information Management,Associate Consultant,"45,000",USD,"Seattle, WA",8 - 10 years,,
4/24/2019 11:43:30,25-34,Nonprofit ,Development Manager ,"51,000",USD,"Dallas, Texas, United States",2 - 4 years,"I manage our fundraising department, primarily overseeing our direct mail, planned giving, and grant writing programs. ",
4/24/2019 11:43:30,25-34,Higher Education,Student Records Coordinator,"54,371",USD,Philadelphia,8 - 10 years,equivalent to Assistant Registrar,
4/25/2019 8:35:51,25-34,Marketing,Associate Product Manager,"43,000",USD,"Cincinnati, OH, USA",5-7 years,"I started as the Marketing Coordinator, and was given the ""Associate Product Manager"" title as a promotion. My duties remained mostly the same and include graphic design work, marketing, and product management.",
现在,我尝试了以下代码来加载数据:
df = spark.read.option("header", "true").option("multiline", "true").option(
"delimiter", ",").csv("path")
它为我提供了如下最后一条划分列的记录的输出,而且输出与预期不符:
最后一列的值应为 null,即 "If ""Other,"" please indicate the currency here: "
,整个字符串应包含在较早的列中,即 "If your job title needs additional context, please clarify here:"
我也试过.option('quote','/"').option('escape','/"')
但还是不行。
但是,当我尝试使用 Pandas
加载此文件时,它已正确加载。我很惊讶 Pandas
如何识别新列名称的开始位置和所有位置。也许我可以为所有列定义一个 String schema
并将其加载回 spark 数据框,但由于我使用的是较低的 spark 版本,它不会以分布式方式工作,因此我正在探索 Spark 如何可以有效地处理这个问题。
非常感谢任何帮助。
主要问题是您的 csv 文件中的连续双引号。
你必须在你的 csv 文件中转义额外的双引号
像这样:
4/24/2019 11:43:30,25-34,Higher Education,Student Records Coordinator,"54,371",USD,Philadelphia,8 - 10 years,equivalent to Assistant Registrar,
4/25/2019 8:35:51,25-34,Marketing,Associate Product Manager,"43,000",USD,"Cincinnati, OH, USA",5-7 years,"I started as the Marketing Coordinator, and was given the \" \ " Associate Product Manager \" \ " title as a promotion. My duties remained mostly the same and include graphic design work, marketing, and product management.",
在此之后它会按预期生成结果:
df2 = spark.read.option("header",True).csv("sample1.csv")
df2.show(10,truncate=False)
******** 输出 ********
|4/25/2019 8:35:51 |25-34 |Marketing |Associate Product Manager |43,000 |USD |Cincinnati, OH, USA |5-7 years |I started as the Marketing Coordinator, and was given the ""Associate Product Manager"" title as a promotion. My duties remained mostly the same and include graphic design work, marketing, and product management.|null |null |
或者你可以使用打击代码
df2 = spark.read.option("header",True).option("multiline","true").option("escape","\"").csv("sample1.csv")
我有一个 CSV dataset
如下所示:
此外,文本形式的 PFB 数据:
Timestamp,How old are you?,What industry do you work in?,Job title,What is your annual salary?,Please indicate the currency,Where are you located? (City/state/country),How many years of post-college professional work experience do you have?,"If your job title needs additional context, please clarify here:","If ""Other,"" please indicate the currency here: "
4/24/2019 11:43:21,35-44,Government,Talent Management Asst. Director,75000,USD,"Nashville, TN",11 - 20 years,,
4/24/2019 11:43:26,25-34,Environmental nonprofit,Operations Director,"65,000",USD,"Madison, Wi",8 - 10 years,,
4/24/2019 11:43:27,18-24,Market Research,Market Research Assistant,"36,330",USD,"Las Vegas, NV",2 - 4 years,,
4/24/2019 11:43:27,25-34,Biotechnology,Senior Scientist,34600,GBP,"Cardiff, UK",5-7 years,,
4/24/2019 11:43:29,25-34,Healthcare,Social worker (embedded in primary care),55000,USD,"Southeast Michigan, USA",5-7 years,,
4/24/2019 11:43:29,25-34,Information Management,Associate Consultant,"45,000",USD,"Seattle, WA",8 - 10 years,,
4/24/2019 11:43:30,25-34,Nonprofit ,Development Manager ,"51,000",USD,"Dallas, Texas, United States",2 - 4 years,"I manage our fundraising department, primarily overseeing our direct mail, planned giving, and grant writing programs. ",
4/24/2019 11:43:30,25-34,Higher Education,Student Records Coordinator,"54,371",USD,Philadelphia,8 - 10 years,equivalent to Assistant Registrar,
4/25/2019 8:35:51,25-34,Marketing,Associate Product Manager,"43,000",USD,"Cincinnati, OH, USA",5-7 years,"I started as the Marketing Coordinator, and was given the ""Associate Product Manager"" title as a promotion. My duties remained mostly the same and include graphic design work, marketing, and product management.",
现在,我尝试了以下代码来加载数据:
df = spark.read.option("header", "true").option("multiline", "true").option(
"delimiter", ",").csv("path")
它为我提供了如下最后一条划分列的记录的输出,而且输出与预期不符:
最后一列的值应为 null,即 "If ""Other,"" please indicate the currency here: "
,整个字符串应包含在较早的列中,即 "If your job title needs additional context, please clarify here:"
我也试过.option('quote','/"').option('escape','/"')
但还是不行。
但是,当我尝试使用 Pandas
加载此文件时,它已正确加载。我很惊讶 Pandas
如何识别新列名称的开始位置和所有位置。也许我可以为所有列定义一个 String schema
并将其加载回 spark 数据框,但由于我使用的是较低的 spark 版本,它不会以分布式方式工作,因此我正在探索 Spark 如何可以有效地处理这个问题。
非常感谢任何帮助。
主要问题是您的 csv 文件中的连续双引号。 你必须在你的 csv 文件中转义额外的双引号 像这样:
4/24/2019 11:43:30,25-34,Higher Education,Student Records Coordinator,"54,371",USD,Philadelphia,8 - 10 years,equivalent to Assistant Registrar,
4/25/2019 8:35:51,25-34,Marketing,Associate Product Manager,"43,000",USD,"Cincinnati, OH, USA",5-7 years,"I started as the Marketing Coordinator, and was given the \" \ " Associate Product Manager \" \ " title as a promotion. My duties remained mostly the same and include graphic design work, marketing, and product management.",
在此之后它会按预期生成结果:
df2 = spark.read.option("header",True).csv("sample1.csv")
df2.show(10,truncate=False)
******** 输出 ********
|4/25/2019 8:35:51 |25-34 |Marketing |Associate Product Manager |43,000 |USD |Cincinnati, OH, USA |5-7 years |I started as the Marketing Coordinator, and was given the ""Associate Product Manager"" title as a promotion. My duties remained mostly the same and include graphic design work, marketing, and product management.|null |null |
或者你可以使用打击代码
df2 = spark.read.option("header",True).option("multiline","true").option("escape","\"").csv("sample1.csv")