我只需要附加那些在 pyspark 数据框中具有非空值的人
I need to append only those who has non null values in pyspark dataframe
我的 pyspark 数据框 (df) 具有以下示例 table (table1):
id, col1, col2, col3
1、abc、null、def
2, 空值, def, abc
3, def, abc, null
我试图通过忽略空值附加所有列来获取新列(最终)。
我已经尝试过 pyspark 代码并使用了 f.array(col1, col2, col3)。值被追加,但它不会忽略空值。我也尝试过 UDF 仅附加非空列,但它不起作用。
import pyspark.sql.functions as f
df = spark.table('table1')
df = df.withColumn('final', f.array(col1,col2,col3))
Actual result:
id, col1, col2, col3, final
1, abc, null, def, [abc,,def]
2, null, def, abc, [,def, abc]
3, def, abc, null, [def,abc,,]
expected result:
id, col1, col2, col3, final
1, abc, null, def, [abc,def]
2, null, def, abc, [def, abc]
3, def, abc, null, [def,abc]
my col1, col2, col3 schema are as below:
where as col1 name is applications
applications: struct (nullable = false)
applicationid: string (nullable = true)
createdat: string (nullable = true)
updatedat: string (nullable = true)
source_name: string (nullable = true)
status: string (nullable = true)
creditbureautypeid: string (nullable = true)
score: integer (nullable = true)
applicationcreditreportid: string (nullable = true)
firstname: string (nullable = false)
lastname: string (nullable = false)
dateofbirth: string (nullable = false)
accounts: array (nullable = true)
element: struct (containsNull = true)
applicationcreditreportaccountid: string (nullable = true)
currentbalance: integer (nullable = true)
institutionid: string (nullable = true)
accounttypeid: string (nullable = true)
dayspastdue: integer (nullable = true)
institution_name: string (nullable = true)
account_type_name: string (nullable = true)
如果问题不清楚或需要更多信息,请告诉我。
任何帮助,将不胜感激。 :)
您可以定义自己的 UDF
如下:
def only_not_null(st,nd,rd):
return [x for x in locals().values() if x is not None] # Take non empty columns
然后调用:
df = spark.table('table1')
df = df.withColumn('final', f.udf(only_not_null)(col1,col2,col3))
使用 UDF
from pyspark.sql.functions import udf, array
def join_columns(row_list):
return [cell_val for cell_val in row_list if cell_val is not None]
join_udf = udf(join_columns)
df = spark.table('table1')
df = df.withColumn('final', join_udf(array(col1,col2,col3))
适用于多列,而不仅仅是 3 列,只需编辑数组中的列即可。
从 Spark 2.4 开始,您可以使用高阶函数来执行此操作(不需要 UDF)。在 PySpark 中,查询可能如下所示:
result = (
df
.withColumn("temp", f.array("col1", "col2", "col3"))
.withColumn("final", f.expr("FILTER(temp, x -> x is not null)"))
.drop("temp")
)
我的 pyspark 数据框 (df) 具有以下示例 table (table1): id, col1, col2, col3 1、abc、null、def 2, 空值, def, abc 3, def, abc, null
我试图通过忽略空值附加所有列来获取新列(最终)。 我已经尝试过 pyspark 代码并使用了 f.array(col1, col2, col3)。值被追加,但它不会忽略空值。我也尝试过 UDF 仅附加非空列,但它不起作用。
import pyspark.sql.functions as f
df = spark.table('table1')
df = df.withColumn('final', f.array(col1,col2,col3))
Actual result:
id, col1, col2, col3, final
1, abc, null, def, [abc,,def]
2, null, def, abc, [,def, abc]
3, def, abc, null, [def,abc,,]
expected result:
id, col1, col2, col3, final
1, abc, null, def, [abc,def]
2, null, def, abc, [def, abc]
3, def, abc, null, [def,abc]
my col1, col2, col3 schema are as below:
where as col1 name is applications
applications: struct (nullable = false)
applicationid: string (nullable = true)
createdat: string (nullable = true)
updatedat: string (nullable = true)
source_name: string (nullable = true)
status: string (nullable = true)
creditbureautypeid: string (nullable = true)
score: integer (nullable = true)
applicationcreditreportid: string (nullable = true)
firstname: string (nullable = false)
lastname: string (nullable = false)
dateofbirth: string (nullable = false)
accounts: array (nullable = true)
element: struct (containsNull = true)
applicationcreditreportaccountid: string (nullable = true)
currentbalance: integer (nullable = true)
institutionid: string (nullable = true)
accounttypeid: string (nullable = true)
dayspastdue: integer (nullable = true)
institution_name: string (nullable = true)
account_type_name: string (nullable = true)
如果问题不清楚或需要更多信息,请告诉我。 任何帮助,将不胜感激。 :)
您可以定义自己的 UDF
如下:
def only_not_null(st,nd,rd):
return [x for x in locals().values() if x is not None] # Take non empty columns
然后调用:
df = spark.table('table1')
df = df.withColumn('final', f.udf(only_not_null)(col1,col2,col3))
使用 UDF
from pyspark.sql.functions import udf, array
def join_columns(row_list):
return [cell_val for cell_val in row_list if cell_val is not None]
join_udf = udf(join_columns)
df = spark.table('table1')
df = df.withColumn('final', join_udf(array(col1,col2,col3))
适用于多列,而不仅仅是 3 列,只需编辑数组中的列即可。
从 Spark 2.4 开始,您可以使用高阶函数来执行此操作(不需要 UDF)。在 PySpark 中,查询可能如下所示:
result = (
df
.withColumn("temp", f.array("col1", "col2", "col3"))
.withColumn("final", f.expr("FILTER(temp, x -> x is not null)"))
.drop("temp")
)