基于值的 PySpark 字符串列分解
PySpark string column breakup based on values
我有一个像这样的 pyspark 数据框,
+---+-----+
| id| info|
+---+-----+
| 1|--XX-|
| 2|XX--X|
+---+-----+
info栏目基本编码了2018-01开始每个月的信息
我想根据每个项目拆分数据框并创建它的融合版本,每个月添加为一行。
预期数据如下所示,
+---+-------+----+
| id|monthid|info|
+---+-------+----+
| 1| 201801| -|
| 1| 201802| -|
| 1| 201803| X|
| 1| 201804| X|
| 1| 201805| -|
| 2| 201801| X|
| 2| 201802| X|
| 2| 201803| -|
| 2| 201804| -|
| 2| 201805| X|
+---+-------+----+
我的数据很大,我想避免任何循环。我可以想出一种在 PySpark 中轻松执行此操作的方法。
借助@wwnde 的绝妙解决方案,我能够为我分享的示例数据生成结果。但是当数据信息超过 12 项时,事情就变得棘手了。
查看下面的示例数据和预期结果,
+---+-------------+
| id| info|
+---+-------------+
| 1|--XX---------|
| 2| XX--X|
+---+-------------+
预期结果,
+---+-------+----+
| id|monthid|info|
+---+-------+----+
| 1| 201801| -|
| 1| 201802| -|
| 1| 201803| X|
| 1| 201804| X|
| 1| 201805| -|
| 1| 201806| -|
| 1| 201807| -|
| 1| 201808| -|
| 1| 201809| -|
| 1| 201810| -|
| 1| 201811| -|
| 1| 201812| -|
| 1| 201901| -|
| 1| 201902| X|
| 2| 201801| X|
| 2| 201802| X|
| 2| 201803| -|
| 2| 201804| -|
| 2| 201805| X|
+---+-------+----+
基本上,monthid 列应该跟在日历月之后。
df=(
#replace each character in info with itself followed by comma
#Use the comma to split it into an array
#posexplode the array
df.select('id', posexplode(split(regexp_replace(col('info'),r'(?<=.{1})', r','),'\,')))
#Compute the info by adding 201801 to pos
.withColumn('pos',lit(201801)+col('pos')).filter(col('col')!="")
#Rename columns
.withColumnRenamed("pos","monthid") \
.withColumnRenamed("col","info")
).show()
+---+-------+----+
| id|monthid|info|
+---+-------+----+
| 1| 201801| -|
| 1| 201802| -|
| 1| 201803| X|
| 1| 201804| X|
| 1| 201805| -|
| 2| 201801| X|
| 2| 201802| X|
| 2| 201803| -|
| 2| 201804| -|
| 2| 201805| X|
+---+-------+----+
根据@wwnde 提供的答案,我能够将其扩展到我使用日历月的完整用例。
关键是使用 add_months 添加 month 和 pos 列,如果是后处理的话。代码是这样的,
(
#replace each character in info with itself followed by comma
#Use the comma to split it into an array
#posexplode the array
df.select('id', F.posexplode(F.split(F.regexp_replace(F.col('info'),r'(?<=.{1})', r','),'\,')))
#Create a dummy column with start date as timestamp and remove empty items from col
.withColumn("month", F.to_timestamp(F.lit("2018-01-01"))) \
.filter(F.col('col')!="")
# use add_months and add the month and pos columns to get actual calander month
.withColumn('month', F.expr("add_months(month, pos)")) \
# extract year, month and left pad month with 0 to handle 1 digit months (1-9)
.withColumn('monthid', F.concat(F.year('month'), F.lpad(F.month('month'), 2, '0')))
#Rename columns
.withColumnRenamed("col","info") \
.drop("month", "pos")
).show()
+---+----+-------+
| id|info|monthid|
+---+----+-------+
| 1| -| 201801|
| 1| -| 201802|
| 1| X| 201803|
| 1| X| 201804|
| 1| -| 201805|
| 1| -| 201806|
| 1| -| 201807|
| 1| -| 201808|
| 1| -| 201809|
| 1| -| 201810|
| 1| -| 201811|
| 1| -| 201812|
| 1| -| 201901|
| 1| X| 201902|
| 2| X| 201801|
| 2| X| 201802|
| 2| -| 201803|
| 2| -| 201804|
| 2| X| 201805|
+---+----+-------+
我有一个像这样的 pyspark 数据框,
+---+-----+
| id| info|
+---+-----+
| 1|--XX-|
| 2|XX--X|
+---+-----+
info栏目基本编码了2018-01开始每个月的信息
我想根据每个项目拆分数据框并创建它的融合版本,每个月添加为一行。
预期数据如下所示,
+---+-------+----+
| id|monthid|info|
+---+-------+----+
| 1| 201801| -|
| 1| 201802| -|
| 1| 201803| X|
| 1| 201804| X|
| 1| 201805| -|
| 2| 201801| X|
| 2| 201802| X|
| 2| 201803| -|
| 2| 201804| -|
| 2| 201805| X|
+---+-------+----+
我的数据很大,我想避免任何循环。我可以想出一种在 PySpark 中轻松执行此操作的方法。
借助@wwnde 的绝妙解决方案,我能够为我分享的示例数据生成结果。但是当数据信息超过 12 项时,事情就变得棘手了。
查看下面的示例数据和预期结果,
+---+-------------+
| id| info|
+---+-------------+
| 1|--XX---------|
| 2| XX--X|
+---+-------------+
预期结果,
+---+-------+----+
| id|monthid|info|
+---+-------+----+
| 1| 201801| -|
| 1| 201802| -|
| 1| 201803| X|
| 1| 201804| X|
| 1| 201805| -|
| 1| 201806| -|
| 1| 201807| -|
| 1| 201808| -|
| 1| 201809| -|
| 1| 201810| -|
| 1| 201811| -|
| 1| 201812| -|
| 1| 201901| -|
| 1| 201902| X|
| 2| 201801| X|
| 2| 201802| X|
| 2| 201803| -|
| 2| 201804| -|
| 2| 201805| X|
+---+-------+----+
基本上,monthid 列应该跟在日历月之后。
df=(
#replace each character in info with itself followed by comma
#Use the comma to split it into an array
#posexplode the array
df.select('id', posexplode(split(regexp_replace(col('info'),r'(?<=.{1})', r','),'\,')))
#Compute the info by adding 201801 to pos
.withColumn('pos',lit(201801)+col('pos')).filter(col('col')!="")
#Rename columns
.withColumnRenamed("pos","monthid") \
.withColumnRenamed("col","info")
).show()
+---+-------+----+
| id|monthid|info|
+---+-------+----+
| 1| 201801| -|
| 1| 201802| -|
| 1| 201803| X|
| 1| 201804| X|
| 1| 201805| -|
| 2| 201801| X|
| 2| 201802| X|
| 2| 201803| -|
| 2| 201804| -|
| 2| 201805| X|
+---+-------+----+
根据@wwnde 提供的答案,我能够将其扩展到我使用日历月的完整用例。
关键是使用 add_months 添加 month 和 pos 列,如果是后处理的话。代码是这样的,
(
#replace each character in info with itself followed by comma
#Use the comma to split it into an array
#posexplode the array
df.select('id', F.posexplode(F.split(F.regexp_replace(F.col('info'),r'(?<=.{1})', r','),'\,')))
#Create a dummy column with start date as timestamp and remove empty items from col
.withColumn("month", F.to_timestamp(F.lit("2018-01-01"))) \
.filter(F.col('col')!="")
# use add_months and add the month and pos columns to get actual calander month
.withColumn('month', F.expr("add_months(month, pos)")) \
# extract year, month and left pad month with 0 to handle 1 digit months (1-9)
.withColumn('monthid', F.concat(F.year('month'), F.lpad(F.month('month'), 2, '0')))
#Rename columns
.withColumnRenamed("col","info") \
.drop("month", "pos")
).show()
+---+----+-------+
| id|info|monthid|
+---+----+-------+
| 1| -| 201801|
| 1| -| 201802|
| 1| X| 201803|
| 1| X| 201804|
| 1| -| 201805|
| 1| -| 201806|
| 1| -| 201807|
| 1| -| 201808|
| 1| -| 201809|
| 1| -| 201810|
| 1| -| 201811|
| 1| -| 201812|
| 1| -| 201901|
| 1| X| 201902|
| 2| X| 201801|
| 2| X| 201802|
| 2| -| 201803|
| 2| -| 201804|
| 2| X| 201805|
+---+----+-------+