从 pandas 对 Spark 数据帧执行预处理操作
Perform preprocessing operations from pandas on Spark dataframe
我有一个相当大的 CSV,所以我使用 AWS EMR 将数据读入 Spark 数据帧以执行一些操作。我有一个 pandas 函数可以做一些简单的预处理:
def clean_census_data(df):
"""
This function cleans the dataframe and drops columns that contain 70% NaN values
"""
# Replace None or 0 with np.nan
df = df.replace('None', np.nan)
# Replace weird numbers
df = df.replace(-666666666.0, np.nan)
# Drop columns that contain 70% NaN or 0 values
df = df.loc[:, df.isnull().mean() < .7]
return df
我想把这个函数应用到一个Spark数据帧上,但是函数不一样。我不熟悉 Spark,在 pandas 中执行这些相当简单的操作对我来说并不明显,如何在 Spark 中执行相同的操作。我知道我可以将 Spark 数据帧转换为 pandas,但这似乎效率不高。
本机 Spark 函数可以对每一列进行此类聚合。
以下数据框包含 null、nans 和 zeros 的百分比。
df2 = df1.select(
[(F.count(F.when(F.isnan(c) | F.col(c).isNull() | (F.col(c) == 0), c))
/ F.count(F.lit(1))).alias(c)
for c in df1.columns]
)
举个例子:
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[(1000, 0, None),
(None, 2, None),
(None, 3, 2222),
(None, 4, 2233),
(None, 5, 2244)],
['c1', 'c2', 'c3'])
df2 = df1.select(
[(F.count(F.when(F.isnan(c) | F.col(c).isNull() | (F.col(c) == 0), c))
/ F.count(F.lit(1))).alias(c)
for c in df1.columns]
)
df2.show()
# +---+---+---+
# | c1| c2| c3|
# +---+---+---+
# |0.8|0.2|0.4|
# +---+---+---+
剩下的就是从 df1 中选择列:
df = df1.select([c for c in df1.columns if df2.head()[c] < .7])
df.show()
# +---+----+
# | c2| c3|
# +---+----+
# | 0|null|
# | 2|null|
# | 3|2222|
# | 4|2233|
# | 5|2244|
# +---+----+
百分比是根据这个条件计算出来的,根据需要修改:
F.isnan(c) | F.col(c).isNull() | (F.col(c) == 0)
这会将 None 替换为 np.nan:
df.fillna(np.nan)
这会将指定值替换为 np.nan:
df.replace(-666666666, np.nan)
第一次回答,请多多包涵。此函数应该使用 pyspark 数据帧而不是 pandas 数据帧,并且应该给你类似的结果:
def clean_census_data(df):
"""
This function cleans the dataframe and drops columns that contain 70% NaN values
"""
# Replace None or 0 with np.nan
df = df.replace('None', None)
# Replace weird numbers
df = df.replace(-666666666.0, None)
# Drop columns that contain 70% NaN or 0 values
selection_dict = df.select([(count(when(isnan(c) | col(c).isNull() | (col(c).cast('int') == 0), c))/count(c) > .7).alias(c) for c in df.columns]).first().asDict()
columns_to_remove = [name for name, is_selected in selection_dict.items() if is_selected]
df = df.drop(*columns_to_remove)
return df
注意:生成的数据帧包含 None 而不是 np.nan。
我有一个相当大的 CSV,所以我使用 AWS EMR 将数据读入 Spark 数据帧以执行一些操作。我有一个 pandas 函数可以做一些简单的预处理:
def clean_census_data(df):
"""
This function cleans the dataframe and drops columns that contain 70% NaN values
"""
# Replace None or 0 with np.nan
df = df.replace('None', np.nan)
# Replace weird numbers
df = df.replace(-666666666.0, np.nan)
# Drop columns that contain 70% NaN or 0 values
df = df.loc[:, df.isnull().mean() < .7]
return df
我想把这个函数应用到一个Spark数据帧上,但是函数不一样。我不熟悉 Spark,在 pandas 中执行这些相当简单的操作对我来说并不明显,如何在 Spark 中执行相同的操作。我知道我可以将 Spark 数据帧转换为 pandas,但这似乎效率不高。
本机 Spark 函数可以对每一列进行此类聚合。
以下数据框包含 null、nans 和 zeros 的百分比。
df2 = df1.select(
[(F.count(F.when(F.isnan(c) | F.col(c).isNull() | (F.col(c) == 0), c))
/ F.count(F.lit(1))).alias(c)
for c in df1.columns]
)
举个例子:
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[(1000, 0, None),
(None, 2, None),
(None, 3, 2222),
(None, 4, 2233),
(None, 5, 2244)],
['c1', 'c2', 'c3'])
df2 = df1.select(
[(F.count(F.when(F.isnan(c) | F.col(c).isNull() | (F.col(c) == 0), c))
/ F.count(F.lit(1))).alias(c)
for c in df1.columns]
)
df2.show()
# +---+---+---+
# | c1| c2| c3|
# +---+---+---+
# |0.8|0.2|0.4|
# +---+---+---+
剩下的就是从 df1 中选择列:
df = df1.select([c for c in df1.columns if df2.head()[c] < .7])
df.show()
# +---+----+
# | c2| c3|
# +---+----+
# | 0|null|
# | 2|null|
# | 3|2222|
# | 4|2233|
# | 5|2244|
# +---+----+
百分比是根据这个条件计算出来的,根据需要修改:
F.isnan(c) | F.col(c).isNull() | (F.col(c) == 0)
这会将 None 替换为 np.nan:
df.fillna(np.nan)
这会将指定值替换为 np.nan:
df.replace(-666666666, np.nan)
第一次回答,请多多包涵。此函数应该使用 pyspark 数据帧而不是 pandas 数据帧,并且应该给你类似的结果:
def clean_census_data(df):
"""
This function cleans the dataframe and drops columns that contain 70% NaN values
"""
# Replace None or 0 with np.nan
df = df.replace('None', None)
# Replace weird numbers
df = df.replace(-666666666.0, None)
# Drop columns that contain 70% NaN or 0 values
selection_dict = df.select([(count(when(isnan(c) | col(c).isNull() | (col(c).cast('int') == 0), c))/count(c) > .7).alias(c) for c in df.columns]).first().asDict()
columns_to_remove = [name for name, is_selected in selection_dict.items() if is_selected]
df = df.drop(*columns_to_remove)
return df
注意:生成的数据帧包含 None 而不是 np.nan。