如何基于HDFS用Spark-Python进行数据清洗
How to conduct Data Cleaning with Spark-Python based on HDFS
目前,我专注于数据挖掘项目中的数据预处理。具体来说,我想用基于HDFS的PySpark做数据清洗。我对这些东西很陌生,所以我想问一下怎么做?
例如,HDFS 中有一个 table 包含以下条目:
attrA attrB attrC label
1 a abc 0
2 abc 0
4 b abc 1
4 b abc 1
5 a abc 0
清除所有条目后,第 2 行 <2, , abc, 0>
应具有 attrB 的默认值或估算值,并且应删除第 3 行或第 3 行。那么如何使用 PySpark 实现它呢?
根据你的要求,有两件事你想实现,首先删除重复的行,这可以通过 distinct 函数实现
df2 = df.distinct().show()
将为您提供数据框的不同行。
其次是补缺值,可以通过fillna函数来实现
df2 = df.na.fill({'attrB': 'm'}).show()
这是任何数据驱动解决方案中的一个非常普遍的问题。我推荐使用 Pyspark 进行数据清理的最佳工具是 Optimus。
所以让我们看看。首先让我们假设你已经在内存中有这个 DF:
df.show()
+-----+-----+-----+-----+
|attrA|attrB|attrC|label|
+-----+-----+-----+-----+
| 1| a| abc| 0|
| 2| | abc| 0|
| 4| b| abc| 1|
| 4| b| abc| 1|
| 5| a| abc| 0|
+-----+-----+-----+-----+
首先让我们实例化 DFTransfomer:
transformer = op.DataFrameTransformer(df)
- 设置空单元格的默认值:
df_default = transformer.replace_col(search='', change_to='new_value', columns='attrB').df
df_default.show()
+-----+---------+-----+-----+
|attrA| attrB|attrC|label|
+-----+---------+-----+-----+
| 1| a| abc| 0|
| 2|new_value| abc| 0|
| 4| b| abc| 1|
| 4| b| abc| 1|
| 5| a| abc| 0|
+-----+---------+-----+-----+
- 消除重复记录:
df_clean = transformer.remove_duplicates(["attrA","attrB"]).df
df_clean.show()
+-----+---------+-----+-----+
|attrA| attrB|attrC|label|
+-----+---------+-----+-----+
| 4| b| abc| 1|
| 5| a| abc| 0|
| 1| a| abc| 0|
| 2|new_value| abc| 0|
+-----+---------+-----+-----+
目前,我专注于数据挖掘项目中的数据预处理。具体来说,我想用基于HDFS的PySpark做数据清洗。我对这些东西很陌生,所以我想问一下怎么做?
例如,HDFS 中有一个 table 包含以下条目:
attrA attrB attrC label
1 a abc 0
2 abc 0
4 b abc 1
4 b abc 1
5 a abc 0
清除所有条目后,第 2 行 <2, , abc, 0>
应具有 attrB 的默认值或估算值,并且应删除第 3 行或第 3 行。那么如何使用 PySpark 实现它呢?
根据你的要求,有两件事你想实现,首先删除重复的行,这可以通过 distinct 函数实现
df2 = df.distinct().show()
将为您提供数据框的不同行。
其次是补缺值,可以通过fillna函数来实现
df2 = df.na.fill({'attrB': 'm'}).show()
这是任何数据驱动解决方案中的一个非常普遍的问题。我推荐使用 Pyspark 进行数据清理的最佳工具是 Optimus。
所以让我们看看。首先让我们假设你已经在内存中有这个 DF:
df.show()
+-----+-----+-----+-----+
|attrA|attrB|attrC|label|
+-----+-----+-----+-----+
| 1| a| abc| 0|
| 2| | abc| 0|
| 4| b| abc| 1|
| 4| b| abc| 1|
| 5| a| abc| 0|
+-----+-----+-----+-----+
首先让我们实例化 DFTransfomer:
transformer = op.DataFrameTransformer(df)
- 设置空单元格的默认值:
df_default = transformer.replace_col(search='', change_to='new_value', columns='attrB').df
df_default.show()
+-----+---------+-----+-----+
|attrA| attrB|attrC|label|
+-----+---------+-----+-----+
| 1| a| abc| 0|
| 2|new_value| abc| 0|
| 4| b| abc| 1|
| 4| b| abc| 1|
| 5| a| abc| 0|
+-----+---------+-----+-----+
- 消除重复记录:
df_clean = transformer.remove_duplicates(["attrA","attrB"]).df
df_clean.show()
+-----+---------+-----+-----+
|attrA| attrB|attrC|label|
+-----+---------+-----+-----+
| 4| b| abc| 1|
| 5| a| abc| 0|
| 1| a| abc| 0|
| 2|new_value| abc| 0|
+-----+---------+-----+-----+