根据分隔符拆分字符串列并为 Pyspark 中的每个值创建列
Split string column based on delimiter and create columns for each value in Pyspark
我有 1000 多个文件,数据格式如下:
a|b|c|clm4=1|clm5=3
a|b|c|clm4=9|clm6=60|clm7=23
我想读取它并将其转换为数据框,如下所示:
clm1|clm2|clm3|clm4|clm5|clm6|clm7
a|b|c|1|3|null|null
a|b|c|9|null|60|23
我试过以下方法:
files = [f for f in glob.glob(pathToFile + "/**/*.txt.gz", recursive=True)]
df = spark.read.load(files, format='csv', sep = '|', header=None)
但它给了我以下结果:
clm1, clm2, clm3, clm4, clm5
a, b, c, 1, 3
a, b, c, 9, null
对于 Spark 2.4+,您可以将文件作为单个列读取,然后按 |
拆分。您将获得一个可以使用 higher-order functions:
进行转换的数组列
df.show(truncate=False)
+----------------------------+
|clm |
+----------------------------+
|a|b|c|clm4=1|clm5=3 |
|a|b|c|clm4=9|clm6=60|clm7=23|
+----------------------------+
我们使用 transform
函数将我们从 clm
列拆分得到的字符串数组转换为结构数组。
每个结构包含列名(如果存在)(检查字符串是否包含 =
)或将其命名为 clm + (i+1)
,其中 i
是其位置。
transform_expr = """
transform(split(clm, '[|]'), (x, i) ->
struct(
IF(x like '%=%', substring_index(x, '=', 1), concat('clm', i+1)),
substring_index(x, '=', -1)
)
)
"""
现在使用map_from_entries
将数组转换为映射。最后,展开地图并旋转以获得您的列
df.select("clm",
explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")
) \
.groupby("clm").pivot('col_name').agg(first('col_value')) \
.drop("clm") \
.show(truncate=False)
给出:
+----+----+----+----+----+----+----+
|clm1|clm2|clm3|clm4|clm5|clm6|clm7|
+----+----+----+----+----+----+----+
|a |b |c |9 |null|60 |23 |
|a |b |c |1 |3 |null|null|
+----+----+----+----+----+----+----+
我有 1000 多个文件,数据格式如下:
a|b|c|clm4=1|clm5=3
a|b|c|clm4=9|clm6=60|clm7=23
我想读取它并将其转换为数据框,如下所示:
clm1|clm2|clm3|clm4|clm5|clm6|clm7
a|b|c|1|3|null|null
a|b|c|9|null|60|23
我试过以下方法:
files = [f for f in glob.glob(pathToFile + "/**/*.txt.gz", recursive=True)]
df = spark.read.load(files, format='csv', sep = '|', header=None)
但它给了我以下结果:
clm1, clm2, clm3, clm4, clm5
a, b, c, 1, 3
a, b, c, 9, null
对于 Spark 2.4+,您可以将文件作为单个列读取,然后按 |
拆分。您将获得一个可以使用 higher-order functions:
df.show(truncate=False)
+----------------------------+
|clm |
+----------------------------+
|a|b|c|clm4=1|clm5=3 |
|a|b|c|clm4=9|clm6=60|clm7=23|
+----------------------------+
我们使用 transform
函数将我们从 clm
列拆分得到的字符串数组转换为结构数组。
每个结构包含列名(如果存在)(检查字符串是否包含 =
)或将其命名为 clm + (i+1)
,其中 i
是其位置。
transform_expr = """
transform(split(clm, '[|]'), (x, i) ->
struct(
IF(x like '%=%', substring_index(x, '=', 1), concat('clm', i+1)),
substring_index(x, '=', -1)
)
)
"""
现在使用map_from_entries
将数组转换为映射。最后,展开地图并旋转以获得您的列
df.select("clm",
explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")
) \
.groupby("clm").pivot('col_name').agg(first('col_value')) \
.drop("clm") \
.show(truncate=False)
给出:
+----+----+----+----+----+----+----+
|clm1|clm2|clm3|clm4|clm5|clm6|clm7|
+----+----+----+----+----+----+----+
|a |b |c |9 |null|60 |23 |
|a |b |c |1 |3 |null|null|
+----+----+----+----+----+----+----+