pyspark 如何使用两列编写 UDF

pyspark how to write UDF using two columns

rdd = sc.parallelize( [(['a','b','c'], 'c'), \
                       (['h','j','s'], 'j'), \
                       (['w','x','a'], 'a'), \
                       (['o','b','e'], 'c')] )

df = spark.createDataFrame(rdd, ['seq','target'])

+---------+------+
|      seq|target|
+---------+------+
|[a, b, c]|     c|
|[h, j, s]|     j|
|[w, x, a]|     a|
|[o, b, e]|     c|
+---------+------+

我想写一个 UDF 来从 seq 中删除目标。

+---------+------+---------+
|      seq|target| filtered|
+---------+------+---------+
|[a, b, c]|     c|   [a, b]|
|[h, j, s]|     j|   [h, s]|
|[w, x, a]|     a|   [w, x]|
|[o, b, e]|     c|[o, b, e]|
+---------+------+---------+

请注意,这只是一个展示。实际情况更为复杂。我想通过使用另一列(例如 target)作为参数来获得处理一列(例如 seq)的正式方法。 任何通用解决方案?

您可以使用 array_remove:

import pyspark.sql.functions as F

df2 = df.withColumn('filtered', F.expr('array_remove(seq, target)'))

df2.show()
+---------+------+---------+
|      seq|target| filtered|
+---------+------+---------+
|[a, b, c]|     c|   [a, b]|
|[h, j, s]|     j|   [h, s]|
|[w, x, a]|     a|   [w, x]|
|[o, b, e]|     c|[o, b, e]|
+---------+------+---------+

如果您正在寻找 UDF 解决方案,

@F.udf('array<string>')
def array_remove(col1, col2):
    return list(filter(lambda x: x != col2, col1))

df2 = df.withColumn('filtered', array_remove('seq', 'target'))

df2.show()
+---------+------+---------+
|      seq|target| filtered|
+---------+------+---------+
|[a, b, c]|     c|   [a, b]|
|[h, j, s]|     j|   [h, s]|
|[w, x, a]|     a|   [w, x]|
|[o, b, e]|     c|[o, b, e]|
+---------+------+---------+