如何使用 Pyspark 加载复杂数据

How to load complex data using Pyspark

我有一个 CSV dataset 如下所示:

此外,文本形式的 PFB 数据:

Timestamp,How old are you?,What industry do you work in?,Job title,What is your annual salary?,Please indicate the currency,Where are you located? (City/state/country),How many years of post-college professional work experience do you have?,"If your job title needs additional context, please clarify here:","If ""Other,"" please indicate the currency here: "
4/24/2019 11:43:21,35-44,Government,Talent Management Asst. Director,75000,USD,"Nashville, TN",11 - 20 years,,
4/24/2019 11:43:26,25-34,Environmental nonprofit,Operations Director,"65,000",USD,"Madison, Wi",8 - 10 years,,
4/24/2019 11:43:27,18-24,Market Research,Market Research Assistant,"36,330",USD,"Las Vegas, NV",2 - 4 years,,
4/24/2019 11:43:27,25-34,Biotechnology,Senior Scientist,34600,GBP,"Cardiff, UK",5-7 years,,
4/24/2019 11:43:29,25-34,Healthcare,Social worker (embedded in primary care),55000,USD,"Southeast Michigan, USA",5-7 years,,
4/24/2019 11:43:29,25-34,Information Management,Associate Consultant,"45,000",USD,"Seattle, WA",8 - 10 years,,
4/24/2019 11:43:30,25-34,Nonprofit ,Development Manager ,"51,000",USD,"Dallas, Texas, United States",2 - 4 years,"I manage our fundraising department, primarily overseeing our direct mail, planned giving, and grant writing programs. ",
4/24/2019 11:43:30,25-34,Higher Education,Student Records Coordinator,"54,371",USD,Philadelphia,8 - 10 years,equivalent to Assistant Registrar,  
4/25/2019 8:35:51,25-34,Marketing,Associate Product Manager,"43,000",USD,"Cincinnati, OH, USA",5-7 years,"I started as the Marketing Coordinator, and was given the ""Associate Product Manager"" title as a promotion. My duties remained mostly the same and include graphic design work, marketing, and product management.",

现在,我尝试了以下代码来加载数据:

df = spark.read.option("header", "true").option("multiline", "true").option(
    "delimiter", ",").csv("path")

它为我提供了如下最后一条划分列的记录的输出,而且输出与预期不符:

最后一列的值应为 null,即 "If ""Other,"" please indicate the currency here: ",整个字符串应包含在较早的列中,即 "If your job title needs additional context, please clarify here:"

我也试过.option('quote','/"').option('escape','/"')但还是不行。

但是,当我尝试使用 Pandas 加载此文件时,它已正确加载。我很惊讶 Pandas 如何识别新列名称的开始位置和所有位置。也许我可以为所有列定义一个 String schema 并将其加载回 spark 数据框,但由于我使用的是较低的 spark 版本,它不会以分布式方式工作,因此我正在探索 Spark 如何可以有效地处理这个问题。

非常感谢任何帮助。

主要问题是您的 csv 文件中的连续双引号。 你必须在你的 csv 文件中转义额外的双引号 像这样:

4/24/2019 11:43:30,25-34,Higher Education,Student Records Coordinator,"54,371",USD,Philadelphia,8 - 10 years,equivalent to Assistant Registrar,  
4/25/2019 8:35:51,25-34,Marketing,Associate Product Manager,"43,000",USD,"Cincinnati, OH, USA",5-7 years,"I started as the Marketing Coordinator, and was given the  \" \ " Associate Product Manager \" \ " title as a promotion. My duties remained mostly the same and include graphic design work, marketing, and product management.",

在此之后它会按预期生成结果:

df2 = spark.read.option("header",True).csv("sample1.csv")

df2.show(10,truncate=False)

******** 输出 ********

|4/25/2019 8:35:51 |25-34           |Marketing                    |Associate Product Manager               |43,000                     |USD                         |Cincinnati, OH, USA                        |5-7 years                                                               |I started as the Marketing Coordinator, and was given the ""Associate Product Manager"" title as a promotion. My duties remained mostly the same and include graphic design work, marketing, and product management.|null       |null                                   |

或者你可以使用打击代码

df2 = spark.read.option("header",True).option("multiline","true").option("escape","\"").csv("sample1.csv")