PySpark 函数基于多列数据框创建自定义输出
PySpark function to create custom output based on multiple columns of dataframe
我有一个按照以下结构的源 pyspark 数据框:
一个
B
C
D
E
F
G
145
589
1
1
12
25
145
589
1
2
1ad34
145
589
1
3
257
18
55
145
589
2
1
12
25
145
589
2
2
22
45
145
589
2
3
145
589
3
1
32
55
145
589
3
2
Table概览:
- A 和 B 列的组合将具有索引 C 列。对于每个索引 C 列,我们将有 D 列。 A|B|C|D 的串联标识唯一记录。
- 对于下面的完整,dataframe检查是否在dataframe记录遍历的任何点设置了E列。如果是,return 第一个数值(例如 257 应该结果而 1ad34 应该被忽略)这将是优先级 1 操作。
- 如果从未设置 E 列,return最后一行组合的 F 和 G 的串联。如果永远不会在 E 列上设置 257,则 return 基于 145|589|3|1 的 3255。
测试用例 1:优先级列 E 包含的值很少。第一个数字是 257。所以对于 145|589,我们的输出应该是 257。
测试用例 2:优先级列 E 完全为空,然后选取 F 和 G 列的最后一个串联值,对于 145|589,结果应为 3255
我已经为此实现了一个 pyspark 代码,如下所示:
def get_resulting_id(grouped_A_B_df):
try :
out=''
first_E_val_df=grouped_A_B_df.filter(col("E").cast("int").isNotNull()).first()
if ( first_E_val_df):
return first_E_val_df["E"]
unique_C = [x.C for x in grouped_A_B_df.select('C').distinct().collect()]
for uniq in unique_C :
for row in uniq.rdd.toLocalIterator():
out=str(row['F'])+str(row['G'])
except:
raise Exception("Func failed")
return out
由于源数据帧有 2000 万条记录,我不想在 priority2 条件下使用 localiterator,任何可能的方法来加速操作。按 A 列和 B 列的组合分区的源数据帧将给出子集数据帧。我希望我的自定义函数应用于该子集数据帧和 return 每个子集数据帧的结果。
根据您提供的示例输入数据,不确定您的预期输出到底是什么。我试过你的函数,输出是“257”,所以这是我的完整 pyspark 代码,应该提供相同的输出:
from pyspark.sql import functions as F, Window as W
df.select(
"A",
"B",
F.coalesce(
F.first("E", ignorenulls=True).over(
W.partitionBy("A", "B")
.orderBy("C", "D")
.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
),
F.last(F.concat(F.col("F"), F.col("G")), ignorenulls=True).over(
W.partitionBy("A", "B")
.orderBy("C", "D")
.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
),
).alias("out"),
).distinct().show()
+---+---+---+
| A| B|out|
+---+---+---+
|145|589|257|
+---+---+---+
如果您需要 pandas df 作为输出,您可以将 .show()
替换为 .toPandas()
。
我有一个按照以下结构的源 pyspark 数据框:
一个 | B | C | D | E | F | G |
---|---|---|---|---|---|---|
145 | 589 | 1 | 1 | 12 | 25 | |
145 | 589 | 1 | 2 | 1ad34 | ||
145 | 589 | 1 | 3 | 257 | 18 | 55 |
145 | 589 | 2 | 1 | 12 | 25 | |
145 | 589 | 2 | 2 | 22 | 45 | |
145 | 589 | 2 | 3 | |||
145 | 589 | 3 | 1 | 32 | 55 | |
145 | 589 | 3 | 2 |
Table概览:
- A 和 B 列的组合将具有索引 C 列。对于每个索引 C 列,我们将有 D 列。 A|B|C|D 的串联标识唯一记录。
- 对于下面的完整,dataframe检查是否在dataframe记录遍历的任何点设置了E列。如果是,return 第一个数值(例如 257 应该结果而 1ad34 应该被忽略)这将是优先级 1 操作。
- 如果从未设置 E 列,return最后一行组合的 F 和 G 的串联。如果永远不会在 E 列上设置 257,则 return 基于 145|589|3|1 的 3255。
测试用例 1:优先级列 E 包含的值很少。第一个数字是 257。所以对于 145|589,我们的输出应该是 257。
测试用例 2:优先级列 E 完全为空,然后选取 F 和 G 列的最后一个串联值,对于 145|589,结果应为 3255
我已经为此实现了一个 pyspark 代码,如下所示:
def get_resulting_id(grouped_A_B_df):
try :
out=''
first_E_val_df=grouped_A_B_df.filter(col("E").cast("int").isNotNull()).first()
if ( first_E_val_df):
return first_E_val_df["E"]
unique_C = [x.C for x in grouped_A_B_df.select('C').distinct().collect()]
for uniq in unique_C :
for row in uniq.rdd.toLocalIterator():
out=str(row['F'])+str(row['G'])
except:
raise Exception("Func failed")
return out
由于源数据帧有 2000 万条记录,我不想在 priority2 条件下使用 localiterator,任何可能的方法来加速操作。按 A 列和 B 列的组合分区的源数据帧将给出子集数据帧。我希望我的自定义函数应用于该子集数据帧和 return 每个子集数据帧的结果。
根据您提供的示例输入数据,不确定您的预期输出到底是什么。我试过你的函数,输出是“257”,所以这是我的完整 pyspark 代码,应该提供相同的输出:
from pyspark.sql import functions as F, Window as W
df.select(
"A",
"B",
F.coalesce(
F.first("E", ignorenulls=True).over(
W.partitionBy("A", "B")
.orderBy("C", "D")
.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
),
F.last(F.concat(F.col("F"), F.col("G")), ignorenulls=True).over(
W.partitionBy("A", "B")
.orderBy("C", "D")
.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
),
).alias("out"),
).distinct().show()
+---+---+---+
| A| B|out|
+---+---+---+
|145|589|257|
+---+---+---+
如果您需要 pandas df 作为输出,您可以将 .show()
替换为 .toPandas()
。