PySpark 中的多重过滤
Multiple Filtering in PySpark
我已经导入了一个数据集到Juputer notebook/PySpark中通过EMR处理,例如:
data sample
我想在使用过滤器功能之前清理数据。这包括:
- 删除空白或“0”或 NA 成本或日期的行。我认为过滤器类似于:.filter(lambda (a,b,c,d): b = ?, c % 1 == c, d = ?)。我不确定如何过滤水果并储存在这里。
- 删除不正确的值,例如“3”不是水果名称。这对数字来说很容易(只是数字 % 1 == 数字),但我不确定它会如何过滤掉单词。
- 删除统计异常值的行,即与平均值相差 3 个标准差的行。所以这里的单元格 C4 显然需要删除,但我不确定如何将此逻辑合并到过滤器中。
我是否需要一次执行一个过滤器,或者有没有办法一次性过滤数据集(以 lambda 表示法)?
或者,写一个 Spark SQL 查询会更容易吗,它在 'where' 子句中有很多过滤器(但是上面的#3 仍然很难写在 SQL).
如果您阅读文档,http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.filter,它是这样写的
where() is an alias for filter().
因此,对于多个条件,您也可以安全地使用 'filter' 而不是 'where'。
编辑:
如果你想在很多条件下过滤很多列,我更喜欢这种方法。
from dateutil.parser import parse
import pyspark.sql.functions as F
def is_date(string):
try:
parse(string)
return True
except ValueError:
return False
def check_date(d):
if is_date(d):
return d
else:
return None
date_udf = F.udf(check_date,StrinType())
def check_fruit(name):
fruits_list #create a list of fruits(can easily find it on google)
#difficult filtering words otherwise
#try checking from what you want, rest will be filtered
if name in fruits_list:
return name
else:
return None
fruit_udf = F.udf(check_fruit,StringType())
def check_cost(value):
mean, std #calculcated beforehand
threshold_upper = mean + (3*std)
threhold_lower = mean - (3*std)
if value > threhold_lower and value < threshold_upper:
return value
else:
return None
cost_udf = F.udf(check_cost,StringType())
#Similarly create store_udf
df = df.select([date_udf(F.col('date')).alias('date'),\
fruit_udf(F.col('fruit')).alias('fruit'),\
cost_udf(F.col('cost')).alias('cost'),\
store_udf(F.col('store')).alias('store')]).dropna()
这将导致所有列一起工作。
我已经导入了一个数据集到Juputer notebook/PySpark中通过EMR处理,例如:
data sample
我想在使用过滤器功能之前清理数据。这包括:
- 删除空白或“0”或 NA 成本或日期的行。我认为过滤器类似于:.filter(lambda (a,b,c,d): b = ?, c % 1 == c, d = ?)。我不确定如何过滤水果并储存在这里。
- 删除不正确的值,例如“3”不是水果名称。这对数字来说很容易(只是数字 % 1 == 数字),但我不确定它会如何过滤掉单词。
- 删除统计异常值的行,即与平均值相差 3 个标准差的行。所以这里的单元格 C4 显然需要删除,但我不确定如何将此逻辑合并到过滤器中。
我是否需要一次执行一个过滤器,或者有没有办法一次性过滤数据集(以 lambda 表示法)?
或者,写一个 Spark SQL 查询会更容易吗,它在 'where' 子句中有很多过滤器(但是上面的#3 仍然很难写在 SQL).
如果您阅读文档,http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.filter,它是这样写的
where() is an alias for filter().
因此,对于多个条件,您也可以安全地使用 'filter' 而不是 'where'。
编辑: 如果你想在很多条件下过滤很多列,我更喜欢这种方法。
from dateutil.parser import parse
import pyspark.sql.functions as F
def is_date(string):
try:
parse(string)
return True
except ValueError:
return False
def check_date(d):
if is_date(d):
return d
else:
return None
date_udf = F.udf(check_date,StrinType())
def check_fruit(name):
fruits_list #create a list of fruits(can easily find it on google)
#difficult filtering words otherwise
#try checking from what you want, rest will be filtered
if name in fruits_list:
return name
else:
return None
fruit_udf = F.udf(check_fruit,StringType())
def check_cost(value):
mean, std #calculcated beforehand
threshold_upper = mean + (3*std)
threhold_lower = mean - (3*std)
if value > threhold_lower and value < threshold_upper:
return value
else:
return None
cost_udf = F.udf(check_cost,StringType())
#Similarly create store_udf
df = df.select([date_udf(F.col('date')).alias('date'),\
fruit_udf(F.col('fruit')).alias('fruit'),\
cost_udf(F.col('cost')).alias('cost'),\
store_udf(F.col('store')).alias('store')]).dropna()
这将导致所有列一起工作。