比较并删除 Spark / PySpark 中不一致数组的数据框列中的元素
Compare and remove elements out of dataframe columns of inconsistent arrays in Spark / PySpark
我是 Spark 的新手,我找不到解决问题的方法,非常感谢任何建议或帮助。
我有一个 Pyspark.sql.dataframe,其中包含两个包含字符串的数组列。两个列数组的长度不一致,一些行也有 Null 条目。我需要比较这两列,并且必须为 B 列中的每一行删除数组的一个元素,当它在 OVERRIDE 列的数组的该行中找到时。
+---------------+---------------+
| OVERRIDE | B |
+---------------+---------------+
| ['a']| ['a','b']|
| null| ['b']|
| null| ['a','c']|
| ['d','g']| ['d','g']|
| null| null|
| ['f']| ['f']|
+---------------+---------------+
最后应该是这样的:
+---------------+---------------+
| OVERRIDE | B |
+---------------+---------------+
| ['a']| ['b']|
| null| ['b']|
| null| ['a','c']|
| ['d','g']| null|
| null| null|
| ['f']| null|
+---------------+---------------+
我试过
from pyspark.sql.functions import array_remove, array_intersect
df = df.withColumn('B', array_remove(df.B, df.OVERRIDE))
还有
df = df.withColumn('B', array_remove(df.B, array_intersect(df.OVERRIDE, df.B)))
但得知 array_remove() 不能遍历该列,而只能取一个元素(例如 'a')然后在 B 列的所有行中删除它。
我是否必须构建一个 udf 函数?如果是,我应该怎么做?
您可以使用 udf
@udf(returnType=ArrayType(StringType()))
def removeFromRight(override,b):
if(override==None or b==None):
return b
filtered_list=[x for x in b if x not in override]
if(len(filtered_list)==0):
filtered_list=None
return filtered_list
test1=test.withColumn("new_overridden_col",removeFromRight(col("override"),col("b")))
test1.show()
//output of test1
+--------+------+------------------+
|override| b|new_overridden_col|
+--------+------+------------------+
| [a]|[a, b]| [b]|
| null| [b]| [b]|
| null|[a, c]| [a, c]|
| [d, g]| null| null|
| null| null| null|
| [f]| null| null|
+--------+------+------------------+
如果您使用的是 Spark >= 2.4.0,则可以使用内置 array_except(a, b)。该函数将 return 存在于 a 但不存在于 b 中的所有项目。虽然该函数仅在两个参数都不是空值时才有效,因此在使用它之前我们需要将 null 替换为空数组.
这里是 python 版本:
from pyspark.sql.functions import array_except, when, array, col
df = spark.createDataFrame([
[["a"], ["a", "b"]],
[None, ["b"]],
[None, ["a", "c"]],
[["d", "g"], ["d", "g"]],
[["f"], ["f"]]
], ["OVERRIDE", "B"])
df.withColumn("OVERRIDE", when(col("OVERRIDE").isNull(), array()).otherwise(col("OVERRIDE"))) \
.withColumn("diff", array_except(col("B"), col("OVERRIDE"))) \
.show()
// +--------+------+------+
// |OVERRIDE| B| diff|
// +--------+------+------+
// | [a]|[a, b]| [b]|
// | []| [b]| [b]|
// | []|[a, c]|[a, c]|
// | [d, g]|[d, g]| []|
// | [f]| [f]| []|
// +--------+------+------+
还有 Scala:
import org.apache.spark.sql.functions.{array_except, when, array}
val df = Seq(
(Seq("a"), Seq("a", "b")),
(null, Seq("b")),
(null, Seq("a", "c")),
(Seq("d", "g"), Seq("d", "g")),
(Seq("f"), Seq("f"))
).toDF("OVERRIDE", "B")
df.withColumn("OVERRIDE", when($"OVERRIDE".isNull, array()).otherwise($"OVERRIDE"))
.withColumn("diff", array_except($"B", $"OVERRIDE"))
.show
// +--------+------+------+
// |OVERRIDE| B| diff|
// +--------+------+------+
// | [a]|[a, b]| [b]|
// | []| [b]| [b]|
// | []|[a, c]|[a, c]|
// | [d, g]|[d, g]| []|
// | [f]| [f]| []|
// +--------+------+------+
我是 Spark 的新手,我找不到解决问题的方法,非常感谢任何建议或帮助。
我有一个 Pyspark.sql.dataframe,其中包含两个包含字符串的数组列。两个列数组的长度不一致,一些行也有 Null 条目。我需要比较这两列,并且必须为 B 列中的每一行删除数组的一个元素,当它在 OVERRIDE 列的数组的该行中找到时。
+---------------+---------------+
| OVERRIDE | B |
+---------------+---------------+
| ['a']| ['a','b']|
| null| ['b']|
| null| ['a','c']|
| ['d','g']| ['d','g']|
| null| null|
| ['f']| ['f']|
+---------------+---------------+
最后应该是这样的:
+---------------+---------------+
| OVERRIDE | B |
+---------------+---------------+
| ['a']| ['b']|
| null| ['b']|
| null| ['a','c']|
| ['d','g']| null|
| null| null|
| ['f']| null|
+---------------+---------------+
我试过
from pyspark.sql.functions import array_remove, array_intersect
df = df.withColumn('B', array_remove(df.B, df.OVERRIDE))
还有
df = df.withColumn('B', array_remove(df.B, array_intersect(df.OVERRIDE, df.B)))
但得知 array_remove() 不能遍历该列,而只能取一个元素(例如 'a')然后在 B 列的所有行中删除它。
我是否必须构建一个 udf 函数?如果是,我应该怎么做?
您可以使用 udf
@udf(returnType=ArrayType(StringType()))
def removeFromRight(override,b):
if(override==None or b==None):
return b
filtered_list=[x for x in b if x not in override]
if(len(filtered_list)==0):
filtered_list=None
return filtered_list
test1=test.withColumn("new_overridden_col",removeFromRight(col("override"),col("b")))
test1.show()
//output of test1
+--------+------+------------------+
|override| b|new_overridden_col|
+--------+------+------------------+
| [a]|[a, b]| [b]|
| null| [b]| [b]|
| null|[a, c]| [a, c]|
| [d, g]| null| null|
| null| null| null|
| [f]| null| null|
+--------+------+------------------+
如果您使用的是 Spark >= 2.4.0,则可以使用内置 array_except(a, b)。该函数将 return 存在于 a 但不存在于 b 中的所有项目。虽然该函数仅在两个参数都不是空值时才有效,因此在使用它之前我们需要将 null 替换为空数组.
这里是 python 版本:
from pyspark.sql.functions import array_except, when, array, col
df = spark.createDataFrame([
[["a"], ["a", "b"]],
[None, ["b"]],
[None, ["a", "c"]],
[["d", "g"], ["d", "g"]],
[["f"], ["f"]]
], ["OVERRIDE", "B"])
df.withColumn("OVERRIDE", when(col("OVERRIDE").isNull(), array()).otherwise(col("OVERRIDE"))) \
.withColumn("diff", array_except(col("B"), col("OVERRIDE"))) \
.show()
// +--------+------+------+
// |OVERRIDE| B| diff|
// +--------+------+------+
// | [a]|[a, b]| [b]|
// | []| [b]| [b]|
// | []|[a, c]|[a, c]|
// | [d, g]|[d, g]| []|
// | [f]| [f]| []|
// +--------+------+------+
还有 Scala:
import org.apache.spark.sql.functions.{array_except, when, array}
val df = Seq(
(Seq("a"), Seq("a", "b")),
(null, Seq("b")),
(null, Seq("a", "c")),
(Seq("d", "g"), Seq("d", "g")),
(Seq("f"), Seq("f"))
).toDF("OVERRIDE", "B")
df.withColumn("OVERRIDE", when($"OVERRIDE".isNull, array()).otherwise($"OVERRIDE"))
.withColumn("diff", array_except($"B", $"OVERRIDE"))
.show
// +--------+------+------+
// |OVERRIDE| B| diff|
// +--------+------+------+
// | [a]|[a, b]| [b]|
// | []| [b]| [b]|
// | []|[a, c]|[a, c]|
// | [d, g]|[d, g]| []|
// | [f]| [f]| []|
// +--------+------+------+