如何在 Pyspark 中迭代数组(字符串)以进行 Null/Blank 值检查
How to iterate an array(string) for Null/Blank value check in Pyspark
我是 Pyspark 的新手。
通过使用下面的数据框,我如何根据“_Value”字段将它分成两个不同的数据框。如果 _Value 是 array(string) 中有任何 null 或空白元素,那么它应该转到一个数据帧,其余的到另一个数据帧。
+----+-----+-----+-------------------------------+-------------+--------------+
|key |Size |Color|AdditionalAttributeMetric |_Name |_Value |
+----+-----+-----+-------------------------------+-------------+--------------+
|123k|BLACK|black|[Size -> BLACK, Color -> black]|[Size, Color]|[BLACK, black]|
|123k|WHITE|null |[Size -> WHITE, Color ->] |[Size, Color]|[WHITE,] |
+----+-----+-----+-------------------------------+-------------+--------------+
下面是完整的代码,但是它抛出了一个错误"Column is not iterable"。
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import lit, col, create_map
from itertools import chain
rdd = sc.parallelize([('123k', 'BLACK', 'black'),
('123k', 'WHITE', None)
])
schema = StructType([StructField('key', StringType(), True),
StructField('Size', StringType(), True),
StructField('Color', StringType(), True)])
df_struct = sqlContext.createDataFrame(rdd, schema)
df_struct_subattri = df_struct.select("Size", "Color")
AdditionalAttributeMetric = create_map(
list(chain(*((lit(name), col(name)) for name in df_struct_subattri.columns)))).alias(
"AdditionalAttributeMetric")
df_struct = df_struct.select("*", AdditionalAttributeMetric)
df_struct = df_struct.withColumn("_Name", map_keys(col("AdditionalAttributeMetric")))
df_struct = df_struct.withColumn("_Value", map_values(col("AdditionalAttributeMetric")))
df_struct1 = df_struct.select("*").where(array_contains (col("_Value"), '') | array_contains (col("_Value"), lit(None)))
df_struct1.show(truncate = False)
感谢任何形式的帮助。
示例数据有 1 行干净,1 行 None
,1 行 ''
。
from pyspark.sql import function as F
df_struct.show()
+----+--------------+
| key| _value|
+----+--------------+
|123k|[BLACK, black]|
|123k| [WHITE,]|
|124k| [BLUE, ]|
+----+--------------+
如果你没有spark2.4,你可以使用array_contains
来检查空字符串。如果任何行中有 null,则 array_contains 的输出将为 null
,或者如果其中有空字符串 "",则输出将为 true
。然后,您可以 filter
在新的 boolean
列上,如下所示。
df.withColumn("boolean", F.array_contains("_value", ""))\
.filter(~((F.col("boolean")==True) | (F.col("boolean").isNull()))).drop("boolean").show()
+----+--------------+
| key| _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+
您可以删除 ~
以获取包含 无或空字符串 .
的所有其他行
df.withColumn("boolean", F.array_contains("_value", ""))\
.filter(((F.col("boolean")==True) | (F.col("boolean").isNull()))).drop("boolean").show()
+----+--------+
| key| _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+
Spark2.4:
可以用高阶函数数组filter
取出None
或''
,然后比较size
在 数据框 filter
.
df_struct.withColumn("_value2", F.expr("""filter(_value, x-> x is not null and x!='')"""))\
.filter((F.size("_value2")==F.size("_value"))).drop("_value2").show()
+----+--------------+
| key| _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+
获取其他行,这些行有 Nones 或 '' 或两者。您可以将 ~
放在过滤器表达式的前面。
df_struct.withColumn("_value2", F.expr("""filter(_value, x-> x is not null and x!='')"""))\
.filter(~(F.size("_value2")==F.size("_value"))).drop("_value2").show()
+----+--------+
| key| _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+
您可以也可以使用高阶函数exists
.
df.withColumn("boolean", F.expr("""exists(_value, x-> x is null or x=='')"""))\
.filter(~(F.col("boolean")==True)).drop("boolean").show()
+----+--------------+
| key| _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+
删除 ~
以获取包含 Nones
或 ""
的所有行:
df.withColumn("boolean", F.expr("""exists(_value, x-> x is null or x=='')"""))\
.filter((F.col("boolean")==True)).drop("boolean").show()
+----+--------+
| key| _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+
我是 Pyspark 的新手。 通过使用下面的数据框,我如何根据“_Value”字段将它分成两个不同的数据框。如果 _Value 是 array(string) 中有任何 null 或空白元素,那么它应该转到一个数据帧,其余的到另一个数据帧。
+----+-----+-----+-------------------------------+-------------+--------------+
|key |Size |Color|AdditionalAttributeMetric |_Name |_Value |
+----+-----+-----+-------------------------------+-------------+--------------+
|123k|BLACK|black|[Size -> BLACK, Color -> black]|[Size, Color]|[BLACK, black]|
|123k|WHITE|null |[Size -> WHITE, Color ->] |[Size, Color]|[WHITE,] |
+----+-----+-----+-------------------------------+-------------+--------------+
下面是完整的代码,但是它抛出了一个错误"Column is not iterable"。
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import lit, col, create_map
from itertools import chain
rdd = sc.parallelize([('123k', 'BLACK', 'black'),
('123k', 'WHITE', None)
])
schema = StructType([StructField('key', StringType(), True),
StructField('Size', StringType(), True),
StructField('Color', StringType(), True)])
df_struct = sqlContext.createDataFrame(rdd, schema)
df_struct_subattri = df_struct.select("Size", "Color")
AdditionalAttributeMetric = create_map(
list(chain(*((lit(name), col(name)) for name in df_struct_subattri.columns)))).alias(
"AdditionalAttributeMetric")
df_struct = df_struct.select("*", AdditionalAttributeMetric)
df_struct = df_struct.withColumn("_Name", map_keys(col("AdditionalAttributeMetric")))
df_struct = df_struct.withColumn("_Value", map_values(col("AdditionalAttributeMetric")))
df_struct1 = df_struct.select("*").where(array_contains (col("_Value"), '') | array_contains (col("_Value"), lit(None)))
df_struct1.show(truncate = False)
感谢任何形式的帮助。
示例数据有 1 行干净,1 行 None
,1 行 ''
。
from pyspark.sql import function as F
df_struct.show()
+----+--------------+
| key| _value|
+----+--------------+
|123k|[BLACK, black]|
|123k| [WHITE,]|
|124k| [BLUE, ]|
+----+--------------+
如果你没有spark2.4,你可以使用array_contains
来检查空字符串。如果任何行中有 null,则 array_contains 的输出将为 null
,或者如果其中有空字符串 "",则输出将为 true
。然后,您可以 filter
在新的 boolean
列上,如下所示。
df.withColumn("boolean", F.array_contains("_value", ""))\
.filter(~((F.col("boolean")==True) | (F.col("boolean").isNull()))).drop("boolean").show()
+----+--------------+
| key| _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+
您可以删除 ~
以获取包含 无或空字符串 .
df.withColumn("boolean", F.array_contains("_value", ""))\
.filter(((F.col("boolean")==True) | (F.col("boolean").isNull()))).drop("boolean").show()
+----+--------+
| key| _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+
Spark2.4:
可以用高阶函数数组filter
取出None
或''
,然后比较size
在 数据框 filter
.
df_struct.withColumn("_value2", F.expr("""filter(_value, x-> x is not null and x!='')"""))\
.filter((F.size("_value2")==F.size("_value"))).drop("_value2").show()
+----+--------------+
| key| _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+
获取其他行,这些行有 Nones 或 '' 或两者。您可以将 ~
放在过滤器表达式的前面。
df_struct.withColumn("_value2", F.expr("""filter(_value, x-> x is not null and x!='')"""))\
.filter(~(F.size("_value2")==F.size("_value"))).drop("_value2").show()
+----+--------+
| key| _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+
您可以也可以使用高阶函数exists
.
df.withColumn("boolean", F.expr("""exists(_value, x-> x is null or x=='')"""))\
.filter(~(F.col("boolean")==True)).drop("boolean").show()
+----+--------------+
| key| _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+
删除 ~
以获取包含 Nones
或 ""
的所有行:
df.withColumn("boolean", F.expr("""exists(_value, x-> x is null or x=='')"""))\
.filter((F.col("boolean")==True)).drop("boolean").show()
+----+--------+
| key| _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+