比较并删除 Spark / PySpark 中不一致数组的数据框列中的元素

Compare and remove elements out of dataframe columns of inconsistent arrays in Spark / PySpark

我是 Spark 的新手,我找不到解决问题的方法,非常感谢任何建议或帮助。

我有一个 Pyspark.sql.dataframe,其中包含两个包含字符串的数组列。两个列数组的长度不一致,一些行也有 Null 条目。我需要比较这两列,并且必须为 B 列中的每一行删除数组的一个元素,当它在 OVERRIDE 列的数组的该行中找到时。

+---------------+---------------+
|    OVERRIDE   |         B     |
+---------------+---------------+
|          ['a']|      ['a','b']|
|           null|          ['b']|
|           null|      ['a','c']|
|      ['d','g']|      ['d','g']|
|           null|           null|
|          ['f']|          ['f']|
+---------------+---------------+

最后应该是这样的:

+---------------+---------------+
|    OVERRIDE   |         B     |
+---------------+---------------+
|          ['a']|          ['b']|
|           null|          ['b']|
|           null|      ['a','c']|
|      ['d','g']|           null|
|           null|           null|
|          ['f']|           null|
+---------------+---------------+

我试过

from pyspark.sql.functions import array_remove, array_intersect

df = df.withColumn('B', array_remove(df.B, df.OVERRIDE))

还有

df = df.withColumn('B', array_remove(df.B, array_intersect(df.OVERRIDE, df.B)))

但得知 array_remove() 不能遍历该列,而只能取一个元素(例如 'a')然后在 B 列的所有行中删除它。

我是否必须构建一个 udf 函数?如果是,我应该怎么做?

您可以使用 udf

@udf(returnType=ArrayType(StringType()))
def removeFromRight(override,b):
if(override==None or b==None):
    return b

filtered_list=[x for x in b if x not in override]
if(len(filtered_list)==0):
    filtered_list=None
return filtered_list

test1=test.withColumn("new_overridden_col",removeFromRight(col("override"),col("b")))    
test1.show()

//output of test1
+--------+------+------------------+
|override|     b|new_overridden_col|
+--------+------+------------------+
|     [a]|[a, b]|               [b]|
|    null|   [b]|               [b]|
|    null|[a, c]|            [a, c]|
|  [d, g]|  null|              null|
|    null|  null|              null|
|     [f]|  null|              null|
+--------+------+------------------+

如果您使用的是 Spark >= 2.4.0,则可以使用内置 array_except(a, b)。该函数将 return 存在于 a 但不存在于 b 中的所有项目。虽然该函数仅在两个参数都不是空值时才有效,因此在使用它之前我们需要将 null 替换为空数组.

这里是 python 版本:

from pyspark.sql.functions import array_except, when, array, col

df = spark.createDataFrame([
  [["a"], ["a", "b"]],
 [None, ["b"]],
 [None, ["a", "c"]],
 [["d", "g"], ["d", "g"]],
 [["f"], ["f"]]
], ["OVERRIDE", "B"])

df.withColumn("OVERRIDE", when(col("OVERRIDE").isNull(), array()).otherwise(col("OVERRIDE"))) \
  .withColumn("diff", array_except(col("B"), col("OVERRIDE"))) \
  .show()

// +--------+------+------+
// |OVERRIDE|     B|  diff|
// +--------+------+------+
// |     [a]|[a, b]|   [b]|
// |      []|   [b]|   [b]|
// |      []|[a, c]|[a, c]|
// |  [d, g]|[d, g]|    []|
// |     [f]|   [f]|    []|
// +--------+------+------+

还有 Scala:

import org.apache.spark.sql.functions.{array_except, when, array}

val df = Seq(
 (Seq("a"), Seq("a", "b")),
 (null, Seq("b")),
 (null, Seq("a", "c")),
 (Seq("d", "g"), Seq("d", "g")),
 (Seq("f"), Seq("f"))
).toDF("OVERRIDE", "B")

df.withColumn("OVERRIDE", when($"OVERRIDE".isNull, array()).otherwise($"OVERRIDE"))
  .withColumn("diff", array_except($"B", $"OVERRIDE"))
  .show

// +--------+------+------+
// |OVERRIDE|     B|  diff|
// +--------+------+------+
// |     [a]|[a, b]|   [b]|
// |      []|   [b]|   [b]|
// |      []|[a, c]|[a, c]|
// |  [d, g]|[d, g]|    []|
// |     [f]|   [f]|    []|
// +--------+------+------+