如何基于HDFS用Spark-Python进行数据清洗

How to conduct Data Cleaning with Spark-Python based on HDFS

目前,我专注于数据挖掘项目中的数据预处理。具体来说,我想用基于HDFS的PySpark做数据清洗。我对这些东西很陌生,所以我想问一下怎么做?

例如,HDFS 中有一个 table 包含以下条目:

attrA   attrB   attrC      label
1       a       abc        0
2               abc        0
4       b       abc        1
4       b       abc        1
5       a       abc        0

清除所有条目后,第 2 行 <2, , abc, 0> 应具有 attrB 的默认值或估算值,并且应删除第 3 行或第 3 行。那么如何使用 PySpark 实现它呢?

根据你的要求,有两件事你想实现,首先删除重复的行,这可以通过 distinct 函数实现

df2 = df.distinct().show()

将为您提供数据框的不同行。

其次是补缺值,可以通过fillna函数来实现

df2 = df.na.fill({'attrB': 'm'}).show()

这是任何数据驱动解决方案中的一个非常普遍的问题。我推荐使用 Pyspark 进行数据清理的最佳工具是 Optimus

所以让我们看看。首先让我们假设你已经在内存中有这个 DF:

df.show()

+-----+-----+-----+-----+
|attrA|attrB|attrC|label|
+-----+-----+-----+-----+
|    1|    a|  abc|    0|
|    2|     |  abc|    0|
|    4|    b|  abc|    1|
|    4|    b|  abc|    1|
|    5|    a|  abc|    0|
+-----+-----+-----+-----+

首先让我们实例化 DFTransfomer:

transformer = op.DataFrameTransformer(df)

  1. 设置空单元格的默认值:

df_default = transformer.replace_col(search='', change_to='new_value', columns='attrB').df

df_default.show()

+-----+---------+-----+-----+
|attrA|    attrB|attrC|label|
+-----+---------+-----+-----+
|    1|        a|  abc|    0|
|    2|new_value|  abc|    0|
|    4|        b|  abc|    1|
|    4|        b|  abc|    1|
|    5|        a|  abc|    0|
+-----+---------+-----+-----+
  1. 消除重复记录:

df_clean = transformer.remove_duplicates(["attrA","attrB"]).df df_clean.show()

 +-----+---------+-----+-----+
 |attrA|    attrB|attrC|label|
 +-----+---------+-----+-----+
 |    4|        b|  abc|    1|
 |    5|        a|  abc|    0|
 |    1|        a|  abc|    0|
 |    2|new_value|  abc|    0|
 +-----+---------+-----+-----+