如何将字符串数组转换为带条件的结构数组
How to convert array of strings into array of struct with conditions
我有一个单列的 pyspark 数据框 _c0
。
a|b|c|clm4=1|clm5=3
a|b|c|clm4=9|clm6=60|clm7=23
我正在尝试将其转换为像这样 selected 列的数据框
clm1,clm2,clm3,clm4,clm6,clm7,clm8
a, b, c, 1, null,null,null
a, b, c, 9, 60, 23, null
请注意,我删除了 clm5
并添加了 clm8
。
我正在使用以下代码:
transform_expr = """
transform(split(_c0, '[|]'), (x, i) ->
struct(
IF(x like '%=%', substring_index(x, '=', 1), concat('_c0', i+1)),
substring_index(x, '=', -1)
)
)
"""
df = df.select("_c0", explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")).groupby("_c0").pivot('col_name').agg(first('col_value')).drop("_c0")
问题是我有多个大文件,我想对其执行此操作,每个文件的结果应包含相同的列(这也是一个长列表),如果输入中不存在,则这些列可以具有空值文件。我如何才能将条件添加到上述代码中 select 只有列名称列表中存在的那些列?
您可以在列表中包含所需的列并使用它来过滤转换后的数组:
column_list = ["clm1", "clm2", "clm3", "clm4", "clm6", "clm7", "clm8"]
现在使用 filter
函数在转换步骤之后添加此过滤器:
column_filter = ','.join(f"'{c}'" for c in column_list)
transform_expr = f"""
filter(transform(split(_c0, '[|]'), (x, i) ->
struct(
IF(x like '%=%', substring_index(x, '=', 1), concat('clm', i+1)) as name,
substring_index(x, '=', -1) as value
)
), x -> x.name in ({column_filter}))
"""
这将过滤掉列表中不存在的所有列。
最后,使用简单的 select 表达式将缺失的列添加为空值:
df = df.select("_c0", explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")).groupby("_c0").pivot('col_name').agg(first('col_value')).drop("_c0")
## add missing columns as nulls
final_columns = [col(c).alias(c) if c in df.columns else lit(None).alias(c) for c in column_list]
df.select(*final_columns).show()
#+----+----+----+----+----+----+----+
#|clm1|clm2|clm3|clm4|clm6|clm7|clm8|
#+----+----+----+----+----+----+----+
#| a| b| c| 9| 60| 23|null|
#| a| b| c| 1|null|null|null|
#+----+----+----+----+----+----+----+
我有一个单列的 pyspark 数据框 _c0
。
a|b|c|clm4=1|clm5=3
a|b|c|clm4=9|clm6=60|clm7=23
我正在尝试将其转换为像这样 selected 列的数据框
clm1,clm2,clm3,clm4,clm6,clm7,clm8
a, b, c, 1, null,null,null
a, b, c, 9, 60, 23, null
请注意,我删除了 clm5
并添加了 clm8
。
我正在使用以下代码:
transform_expr = """
transform(split(_c0, '[|]'), (x, i) ->
struct(
IF(x like '%=%', substring_index(x, '=', 1), concat('_c0', i+1)),
substring_index(x, '=', -1)
)
)
"""
df = df.select("_c0", explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")).groupby("_c0").pivot('col_name').agg(first('col_value')).drop("_c0")
问题是我有多个大文件,我想对其执行此操作,每个文件的结果应包含相同的列(这也是一个长列表),如果输入中不存在,则这些列可以具有空值文件。我如何才能将条件添加到上述代码中 select 只有列名称列表中存在的那些列?
您可以在列表中包含所需的列并使用它来过滤转换后的数组:
column_list = ["clm1", "clm2", "clm3", "clm4", "clm6", "clm7", "clm8"]
现在使用 filter
函数在转换步骤之后添加此过滤器:
column_filter = ','.join(f"'{c}'" for c in column_list)
transform_expr = f"""
filter(transform(split(_c0, '[|]'), (x, i) ->
struct(
IF(x like '%=%', substring_index(x, '=', 1), concat('clm', i+1)) as name,
substring_index(x, '=', -1) as value
)
), x -> x.name in ({column_filter}))
"""
这将过滤掉列表中不存在的所有列。
最后,使用简单的 select 表达式将缺失的列添加为空值:
df = df.select("_c0", explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")).groupby("_c0").pivot('col_name').agg(first('col_value')).drop("_c0")
## add missing columns as nulls
final_columns = [col(c).alias(c) if c in df.columns else lit(None).alias(c) for c in column_list]
df.select(*final_columns).show()
#+----+----+----+----+----+----+----+
#|clm1|clm2|clm3|clm4|clm6|clm7|clm8|
#+----+----+----+----+----+----+----+
#| a| b| c| 9| 60| 23|null|
#| a| b| c| 1|null|null|null|
#+----+----+----+----+----+----+----+