PySpark 中的多重过滤

Multiple Filtering in PySpark

我已经导入了一个数据集到Juputer notebook/PySpark中通过EMR处理,例如:

data sample

我想在使用过滤器功能之前清理数据。这包括:

  1. 删除空白或“0”或 NA 成本或日期的行。我认为过滤器类似于:.filter(lambda (a,b,c,d): b = ?, c % 1 == c, d = ?)。我不确定如何过滤水果并储存在这里。
  2. 删除不正确的值,例如“3”不是水果名称。这对数字来说很容易(只是数字 % 1 == 数字),但我不确定它会如何过滤掉单词。
  3. 删除统计异常值的行,即与平均值相差 3 个标准差的行。所以这里的单元格 C4 显然需要删除,但我不确定如何将此逻辑合并到过滤器中。

我是否需要一次执行一个过滤器,或者有没有办法一次性过滤数据集(以 lambda 表示法)?

或者,写一个 Spark SQL 查询会更容易吗,它在 'where' 子句中有很多过滤器(但是上面的#3 仍然很难写在 SQL).

如果您阅读文档,http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.filter,它是这样写的

where() is an alias for filter().

因此,对于多个条件,您也可以安全地使用 'filter' 而不是 'where'。

编辑: 如果你想在很多条件下过滤很多列,我更喜欢这种方法。

from dateutil.parser import parse
import pyspark.sql.functions as F

def is_date(string):
    try: 
       parse(string)
       return True
    except ValueError:
       return False
def check_date(d):
    if is_date(d):
        return d
    else:
        return None

date_udf = F.udf(check_date,StrinType())

def check_fruit(name):
    fruits_list #create a list of fruits(can easily find it on google)
                #difficult filtering words otherwise
                #try checking from what you want, rest will be filtered
    if name in fruits_list:
        return name
    else:
        return None

fruit_udf = F.udf(check_fruit,StringType())

def check_cost(value):
    mean, std #calculcated beforehand
    threshold_upper = mean + (3*std)
    threhold_lower = mean - (3*std)

    if value > threhold_lower and value < threshold_upper:
        return value
    else:
        return None
cost_udf = F.udf(check_cost,StringType())        

#Similarly create store_udf

df = df.select([date_udf(F.col('date')).alias('date'),\
            fruit_udf(F.col('fruit')).alias('fruit'),\
            cost_udf(F.col('cost')).alias('cost'),\
            store_udf(F.col('store')).alias('store')]).dropna()

这将导致所有列一起工作。