如何在 Pyspark 中迭代数组(字符串)以进行 Null/Blank 值检查

How to iterate an array(string) for Null/Blank value check in Pyspark

我是 Pyspark 的新手。 通过使用下面的数据框,我如何根据“_Value”字段将它分成两个不同的数据框。如果 _Value 是 array(string) 中有任何 null 或空白元素,那么它应该转到一个数据帧,其余的到另一个数据帧。

+----+-----+-----+-------------------------------+-------------+--------------+
|key |Size |Color|AdditionalAttributeMetric      |_Name        |_Value        |
+----+-----+-----+-------------------------------+-------------+--------------+
|123k|BLACK|black|[Size -> BLACK, Color -> black]|[Size, Color]|[BLACK, black]|
|123k|WHITE|null |[Size -> WHITE, Color ->]      |[Size, Color]|[WHITE,]      |
+----+-----+-----+-------------------------------+-------------+--------------+

下面是完整的代码,但是它抛出了一个错误"Column is not iterable"。

from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import lit, col, create_map
from itertools import chain

rdd = sc.parallelize([('123k', 'BLACK', 'black'),
                      ('123k', 'WHITE', None)
                          ])

schema = StructType([StructField('key', StringType(), True),
                     StructField('Size', StringType(), True),
                     StructField('Color', StringType(), True)])

df_struct = sqlContext.createDataFrame(rdd, schema)

df_struct_subattri = df_struct.select("Size", "Color")

AdditionalAttributeMetric = create_map(
            list(chain(*((lit(name), col(name)) for name in df_struct_subattri.columns)))).alias(
            "AdditionalAttributeMetric")

df_struct = df_struct.select("*", AdditionalAttributeMetric)

df_struct = df_struct.withColumn("_Name", map_keys(col("AdditionalAttributeMetric")))
df_struct = df_struct.withColumn("_Value", map_values(col("AdditionalAttributeMetric")))


df_struct1 = df_struct.select("*").where(array_contains (col("_Value"), '') | array_contains (col("_Value"), lit(None)))

df_struct1.show(truncate = False)

感谢任何形式的帮助。

示例数据有 1 行干净,1 行 None,1 行 ''

from pyspark.sql import function as F
df_struct.show()

+----+--------------+
| key|        _value|
+----+--------------+
|123k|[BLACK, black]|
|123k|      [WHITE,]|
|124k|      [BLUE, ]|
+----+--------------+

如果你没有spark2.4,你可以使用array_contains来检查空字符串。如果任何行中有 null,则 array_contains 的输出将为 null,或者如果其中有空字符串 "",则输出将为 true。然后,您可以 filter 在新的 boolean 列上,如下所示。

df.withColumn("boolean", F.array_contains("_value", ""))\
  .filter(~((F.col("boolean")==True) | (F.col("boolean").isNull()))).drop("boolean").show()

+----+--------------+
| key|        _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+

您可以删除 ~ 以获取包含 无或空字符串 .

的所有其他行
df.withColumn("boolean", F.array_contains("_value", ""))\
  .filter(((F.col("boolean")==True) | (F.col("boolean").isNull()))).drop("boolean").show()

+----+--------+
| key|  _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+

Spark2.4:

可以用高阶函数数组filter取出None'',然后比较size 数据框 filter.

df_struct.withColumn("_value2", F.expr("""filter(_value, x-> x is not null and x!='')"""))\
  .filter((F.size("_value2")==F.size("_value"))).drop("_value2").show()

+----+--------------+
| key|        _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+

获取其他行,这些行有 Nones 或 '' 或两者。您可以将 ~ 放在过滤器表达式的前面。

df_struct.withColumn("_value2", F.expr("""filter(_value, x-> x is not null and x!='')"""))\
  .filter(~(F.size("_value2")==F.size("_value"))).drop("_value2").show()

+----+--------+
| key|  _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+

您可以也可以使用高阶函数exists.

df.withColumn("boolean", F.expr("""exists(_value, x-> x is null or x=='')"""))\
  .filter(~(F.col("boolean")==True)).drop("boolean").show()

+----+--------------+
| key|        _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+

删除 ~ 以获取包含 Nones"" 的所有行:

df.withColumn("boolean", F.expr("""exists(_value, x-> x is null or x=='')"""))\
  .filter((F.col("boolean")==True)).drop("boolean").show()

+----+--------+
| key|  _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+