PySpark 函数基于多列数据框创建自定义输出

PySpark function to create custom output based on multiple columns of dataframe

我有一个按照以下结构的源 pyspark 数据框:

一个 B C D E F G
145 589 1 1 12 25
145 589 1 2 1ad34
145 589 1 3 257 18 55
145 589 2 1 12 25
145 589 2 2 22 45
145 589 2 3
145 589 3 1 32 55
145 589 3 2

Table概览:

  1. A 和 B 列的组合将具有索引 C 列。对于每个索引 C 列,我们将有 D 列。 A|B|C|D 的串联标识唯一记录。
  2. 对于下面的完整,dataframe检查是否在dataframe记录遍历的任何点设置了E列。如果是,return 第一个数值(例如 257 应该结果而 1ad34 应该被忽略)这将是优先级 1 操作。
  3. 如果从未设置 E 列,return最后一行组合的 F 和 G 的串联。如果永远不会在 E 列上设置 257,则 return 基于 145|589|3|1 的 3255。

测试用例 1:优先级列 E 包含的值很少。第一个数字是 257。所以对于 145|589,我们的输出应该是 257。

测试用例 2:优先级列 E 完全为空,然后选取 F 和 G 列的最后一个串联值,对于 145|589,结果应为 3255

我已经为此实现了一个 pyspark 代码,如下所示:

def get_resulting_id(grouped_A_B_df):
    try :
        out=''
        first_E_val_df=grouped_A_B_df.filter(col("E").cast("int").isNotNull()).first()
        if ( first_E_val_df):
            return first_E_val_df["E"]
        unique_C = [x.C for x in grouped_A_B_df.select('C').distinct().collect()]
        for uniq in unique_C :
            for row in uniq.rdd.toLocalIterator():
                out=str(row['F'])+str(row['G'])
    except:
        raise Exception("Func failed")
    return out

由于源数据帧有 2000 万条记录,我不想在 priority2 条件下使用 localiterator,任何可能的方法来加速操作。按 A 列和 B 列的组合分区的源数据帧将给出子集数据帧。我希望我的自定义函数应用于该子集数据帧和 return 每个子集数据帧的结果。

根据您提供的示例输入数据,不确定您的预期输出到底是什么。我试过你的函数,输出是“257”,所以这是我的完整 pyspark 代码,应该提供相同的输出:

from pyspark.sql import functions as F, Window as W

df.select(
    "A",
    "B",
    F.coalesce(
        F.first("E", ignorenulls=True).over(
            W.partitionBy("A", "B")
            .orderBy("C", "D")
            .rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
        ),
        F.last(F.concat(F.col("F"), F.col("G")), ignorenulls=True).over(
            W.partitionBy("A", "B")
            .orderBy("C", "D")
            .rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
        ),
    ).alias("out"),
).distinct().show()

+---+---+---+                                                                   
|  A|  B|out|
+---+---+---+
|145|589|257|
+---+---+---+

如果您需要 pandas df 作为输出,您可以将 .show() 替换为 .toPandas()