在某些列上为投票系统过滤 spark 数据框的最佳 PySpark 实践是什么?

What is the best PySpark practice to filter spark dataframe for voting system on certain columns?

我正在努力在 spark 数据帧上创建一个 高效 投票系统,以应用于某些列以选择所需的 records/rows 。假设我的数据如下所示:

+----------+-----------+-------------+----------+----------+
|   Names  |     A     |      B      |     C    |    D     |
+----------+-----------+-------------+----------+----------+
|        X1|       true|         true|      true|      true|
|        X5|      false|         true|      true|     false|
|        X2|      false|        false|     false|     false|
|        X3|       true|         true|      true|     false|
|        X4|      false|        false|     false|      true|
|        X5|      false|         true|      true|     false|
|        X3|       true|         true|      true|     false|
+----------+-----------+-------------+----------+----------+

我想创建一个新的列名称 majority,它计算每行的 true 个数,如果它是:

def get_mode(df):
    counts = df.groupBy(['Names', 'A', 'B', 'C', 'D']).count().alias('count')
    #counts.show()
    win = Window().partitionBy('A', 'B', 'C', 'D').orderBy(F.col('count').desc())
    result = (counts
              .withColumn('majority', F.rank().over(win))
              #.where(F.col('majority) == 1)
              #.select('x', 'y')
             )
    #result.show()
        
    return result
df = get_mode(df)

def voting_sys(df):
    partition_columns = df.columns[1:]
    vote_results = (df  
    .withColumn('majority', F
        .over(Window.partitionBy(partition_columns))
        .when(F.isnull('majority'), '-')
        .when(F.regexp_extract('majority', '(?i)^true', 0) >  F.regexp_extract('majority', '(?i)^false', 0), 'abnormal')   
        .when(F.regexp_extract('majority', '(?i)^true', 0) == F.regexp_extract('majority', '(?i)^false', 0), '50-50')       
        .when(F.regexp_extract('majority', '(?i)^true', 0) <  F.regexp_extract('majority', '(?i)^false', 0), 'normal') 
        #.otherwise('normal') 
                                                                                  
    #.show()
)
        
    return vote_results

注意:我对使用 df.toPandas() 破解它不感兴趣。

我的建议是将布尔值转换为整数,然后取四列的平均值,得到一个介于 0 和 1 之间的数字。

# if the values are stored as strings, you'll need a UDF to convert bool:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType

def true_false(s):
    return int(s.lower() == 'true')

tf_udf = udf(lambda x: true_false(x), IntegerType())

for c in ['A', 'B', 'C', 'D']:
    df.withColumn(c, tf_udf(col(c)))

df.withColumn('majority', (df.A + df.B + df.C + df.D) / 4)

我喜欢@LiamFiddler 将布尔值转换为整数的想法。但是,我不推荐在这里使用UDF,没有必要。

首先,您可以从字符串转换为布尔值,然后从布尔值转换为整数,就像这样F.col(c).cast('boolean').cast('int'),我相信这种转换并不像它可能的那样昂贵听起来像。

其次,您不必在此处对列(A、B、C、D)进行硬编码,您可以求和 sum(F.col(c) for c in cols]

这是我的工作代码

cols = df.columns[1:]
# ['A', 'B', 'C', 'D']

(df
    .withColumn('sum', sum([F.col(c).cast('boolean').cast('int') for c in cols]))
    .withColumn('majority', F
        .when(F.col('sum')  > len(cols) / 2, 'abnormal')
        .when(F.col('sum') == len(cols) / 2, '50-50')
        .when(F.col('sum')  < len(cols) / 2, 'normal')
    )
    
    # order by abnormal, 50-50, normal
    .orderBy(F
        .when(F.col('majority') == 'abnormal', 1)
        .when(F.col('majority') == '50-50', 2)
        .when(F.col('majority') == 'normal', 3)
    )
    .show()
)

# Output
# +-----+-----+-----+-----+-----+---+--------+
# |Names|    A|    B|    C|    D|sum|majority|
# +-----+-----+-----+-----+-----+---+--------+
# |   X3| true| true| true|false|  3|abnormal|
# |   X3| true| true| true|false|  3|abnormal|
# |   X1| true| true| true| true|  4|abnormal|
# |   X5|false| true| true|false|  2|   50-50|
# |   X5|false| true| true|false|  2|   50-50|
# |   X4|false|false|false| true|  1|  normal|
# |   X2|false|false|false|false|  0|  normal|
# +-----+-----+-----+-----+-----+---+--------+