pyspark 如何使用两列编写 UDF
pyspark how to write UDF using two columns
rdd = sc.parallelize( [(['a','b','c'], 'c'), \
(['h','j','s'], 'j'), \
(['w','x','a'], 'a'), \
(['o','b','e'], 'c')] )
df = spark.createDataFrame(rdd, ['seq','target'])
+---------+------+
| seq|target|
+---------+------+
|[a, b, c]| c|
|[h, j, s]| j|
|[w, x, a]| a|
|[o, b, e]| c|
+---------+------+
我想写一个 UDF 来从 seq 中删除目标。
+---------+------+---------+
| seq|target| filtered|
+---------+------+---------+
|[a, b, c]| c| [a, b]|
|[h, j, s]| j| [h, s]|
|[w, x, a]| a| [w, x]|
|[o, b, e]| c|[o, b, e]|
+---------+------+---------+
请注意,这只是一个展示。实际情况更为复杂。我想通过使用另一列(例如 target
)作为参数来获得处理一列(例如 seq
)的正式方法。
任何通用解决方案?
您可以使用 array_remove
:
import pyspark.sql.functions as F
df2 = df.withColumn('filtered', F.expr('array_remove(seq, target)'))
df2.show()
+---------+------+---------+
| seq|target| filtered|
+---------+------+---------+
|[a, b, c]| c| [a, b]|
|[h, j, s]| j| [h, s]|
|[w, x, a]| a| [w, x]|
|[o, b, e]| c|[o, b, e]|
+---------+------+---------+
如果您正在寻找 UDF 解决方案,
@F.udf('array<string>')
def array_remove(col1, col2):
return list(filter(lambda x: x != col2, col1))
df2 = df.withColumn('filtered', array_remove('seq', 'target'))
df2.show()
+---------+------+---------+
| seq|target| filtered|
+---------+------+---------+
|[a, b, c]| c| [a, b]|
|[h, j, s]| j| [h, s]|
|[w, x, a]| a| [w, x]|
|[o, b, e]| c|[o, b, e]|
+---------+------+---------+
rdd = sc.parallelize( [(['a','b','c'], 'c'), \
(['h','j','s'], 'j'), \
(['w','x','a'], 'a'), \
(['o','b','e'], 'c')] )
df = spark.createDataFrame(rdd, ['seq','target'])
+---------+------+
| seq|target|
+---------+------+
|[a, b, c]| c|
|[h, j, s]| j|
|[w, x, a]| a|
|[o, b, e]| c|
+---------+------+
我想写一个 UDF 来从 seq 中删除目标。
+---------+------+---------+
| seq|target| filtered|
+---------+------+---------+
|[a, b, c]| c| [a, b]|
|[h, j, s]| j| [h, s]|
|[w, x, a]| a| [w, x]|
|[o, b, e]| c|[o, b, e]|
+---------+------+---------+
请注意,这只是一个展示。实际情况更为复杂。我想通过使用另一列(例如 target
)作为参数来获得处理一列(例如 seq
)的正式方法。
任何通用解决方案?
您可以使用 array_remove
:
import pyspark.sql.functions as F
df2 = df.withColumn('filtered', F.expr('array_remove(seq, target)'))
df2.show()
+---------+------+---------+
| seq|target| filtered|
+---------+------+---------+
|[a, b, c]| c| [a, b]|
|[h, j, s]| j| [h, s]|
|[w, x, a]| a| [w, x]|
|[o, b, e]| c|[o, b, e]|
+---------+------+---------+
如果您正在寻找 UDF 解决方案,
@F.udf('array<string>')
def array_remove(col1, col2):
return list(filter(lambda x: x != col2, col1))
df2 = df.withColumn('filtered', array_remove('seq', 'target'))
df2.show()
+---------+------+---------+
| seq|target| filtered|
+---------+------+---------+
|[a, b, c]| c| [a, b]|
|[h, j, s]| j| [h, s]|
|[w, x, a]| a| [w, x]|
|[o, b, e]| c|[o, b, e]|
+---------+------+---------+