基于条件组合值的 PySpark 设计模式

PySpark Design Pattern for Combining Values Based on Criteria

你好我是 PySpark 的新手,我想创建一个函数,它接受一个 table 重复行和一个字典 {field_names : ["the source" : "the approach for getting记录"]} 作为输入并创建新记录。新记录将等于优先级列表中的第一个非空值,其中每个“方法”都是一个函数。

例如,特定组件的输入 table 如下所示:

并给出这个优先级命令:

输出记录应如下所示:

新记录看起来像这样,因为对于每个字段,都选择了一个函数来指示如何选择值。 (例如,phone 等于 0.75,因为亚马逊最完整的记录为空,因此您合并到列表中的下一个方法,即 phone 的值以获得 Google = 的最完整记录0.75)

本质上,我想编写一个按组件分组的 pyspark 函数,然后为每一列应用适当的函数以获得正确的值。虽然我有一个“有效”的功能,但时间复杂度非常糟糕,因为我天真地循环遍历每个组件,然后遍历每一列,然后遍历列表中的每种方法来构建记录。

非常感谢任何帮助!

对于一些复杂的使用模式,我认为您可以使用 pyspark.sql.functions.when . See this blog post 来解决这个问题。你会想要按 id 分组,然后使用 when 语句来实现你的逻辑。例如,'title': {'source': 'Google', 'approach': 'first record'} 可以实现为

(df.groupBy('id').agg(
    when(col("source") == lit("Google"), first("title") ).otherwise("null").alias("title" )
)

'Most recent' 和 'most complete' 更复杂,可能需要一些 self-joins,但您仍然应该能够使用 when 子句来获取所需的聚合。