为 Spark 清理和准备数据

Cleaning and preparind data for Spark

我从 Scala、Spark 和 MLlib 开始。 我想从 Kaggle

中实现一个例子

数据格式很糟糕,我在清理和准备数据以处理它们时遇到了很多问题。我请求你的帮助。

数据为:

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S

我在

等空白字段中遇到错误

...,"",... (first line, field "cabin")

...,"A/5 21171",.... (also first line, field "ticket")

我想过滤带有空字段的行(从我的 RDD 中删除它们)以及那些有这样票的行 A/5 21171(我只要数字).

再次感谢您的帮助! ;)

而不是 RDDs,您应该考虑使用 DataSets 以提高性能和易用性——特别是如果您是 Scala 的新手。采用 DataSet 方法,您可以这样做:

val titanicDs = sparkSession.read
    .option("header", true)
    .csv("titanic.csv")
    .na
    .drop
    .withColumn("TicketSplit", split($"Ticket", " "))
    .withColumn("Ticket", when(size($"TicketSplit") === "2", $"TicketSplit".getItem(1)).otherwise($"TicketSplit".getItem(0)))
    .drop("TicketSplit")

这里发生了很多事情:

  • header 选项设置为 true,以便 Spark 意识到第一行是数据的 header 强加结构,并在 DataFrame.[=44= 中使用这些列名称]
  • na 方法 returns 一个 DataFrameNaFunctions object 即 very helpful 用于处理缺失数据。在这种情况下,na.drop 的组合消除了所有包含 any 数据的行 null.
  • 我添加了一个名为 TicketSplit 的新临时列,其中我使用精彩的 functions library 将原始 Ticket 数据拆分为 space 字符放入长度为 1(如果只有数字)或 2(如果文本后跟 space 和数字)的数组中。
  • 我使用 functions 库中的 whenotherwise 根据 TicketSplit 中数组的大小修改原始 Ticket 列柱子。无论 TicketSplit 列中数组的大小如何,最终只有通过在索引 0 处获取 1 元素数组的第一个元素或在索引 1 处获取 2 元素数组的第二个元素来保留数字。
  • 删除 TicketSplit 列,因为它已达到其目的。
  • 喝杯冷饮。