基于值的 PySpark 字符串列分解

PySpark string column breakup based on values

我有一个像这样的 pyspark 数据框,

+---+-----+
| id| info|
+---+-----+
|  1|--XX-|
|  2|XX--X|
+---+-----+

info栏目基本编码了2018-01开始每个月的信息

我想根据每个项目拆分数据框并创建它的融合版本,每个月添加为一行。

预期数据如下所示,

+---+-------+----+
| id|monthid|info|
+---+-------+----+
|  1| 201801|   -|
|  1| 201802|   -|
|  1| 201803|   X|
|  1| 201804|   X|
|  1| 201805|   -|
|  2| 201801|   X|
|  2| 201802|   X|
|  2| 201803|   -|
|  2| 201804|   -|
|  2| 201805|   X|
+---+-------+----+

我的数据很大,我想避免任何循环。我可以想出一种在 PySpark 中轻松执行此操作的方法。

借助@wwnde 的绝妙解决方案,我能够为我分享的示例数据生成结果。但是当数据信息超过 12 项时,事情就变得棘手了。

查看下面的示例数据和预期结果,

+---+-------------+
| id|         info|
+---+-------------+
|  1|--XX---------|
|  2|        XX--X|
+---+-------------+

预期结果,

+---+-------+----+
| id|monthid|info|
+---+-------+----+
|  1| 201801|   -|
|  1| 201802|   -|
|  1| 201803|   X|
|  1| 201804|   X|
|  1| 201805|   -|
|  1| 201806|   -|
|  1| 201807|   -|
|  1| 201808|   -|
|  1| 201809|   -|
|  1| 201810|   -|
|  1| 201811|   -|
|  1| 201812|   -|
|  1| 201901|   -|
|  1| 201902|   X|
|  2| 201801|   X|
|  2| 201802|   X|
|  2| 201803|   -|
|  2| 201804|   -|
|  2| 201805|   X|
+---+-------+----+

基本上,monthid 列应该跟在日历月之后。

df=(
     #replace each character in info with itself followed by comma
    #Use the comma to split it into an array
    #posexplode the array
    df.select('id', posexplode(split(regexp_replace(col('info'),r'(?<=.{1})', r','),'\,')))
    #Compute the info by adding 201801 to pos
    .withColumn('pos',lit(201801)+col('pos')).filter(col('col')!="")
    #Rename columns
    .withColumnRenamed("pos","monthid") \
    .withColumnRenamed("col","info")
    
   ).show()

+---+-------+----+
| id|monthid|info|
+---+-------+----+
|  1| 201801|   -|
|  1| 201802|   -|
|  1| 201803|   X|
|  1| 201804|   X|
|  1| 201805|   -|
|  2| 201801|   X|
|  2| 201802|   X|
|  2| 201803|   -|
|  2| 201804|   -|
|  2| 201805|   X|
+---+-------+----+

根据@wwnde 提供的答案,我能够将其扩展到我使用日历月的完整用例。

关键是使用 add_months 添加 month 和 pos 列,如果是后处理的话。代码是这样的,

(
     #replace each character in info with itself followed by comma
    #Use the comma to split it into an array
    #posexplode the array
    df.select('id', F.posexplode(F.split(F.regexp_replace(F.col('info'),r'(?<=.{1})', r','),'\,')))
    #Create a dummy column with start date as timestamp and remove empty items from col
    .withColumn("month", F.to_timestamp(F.lit("2018-01-01"))) \
    .filter(F.col('col')!="")
    # use add_months and add the month and pos columns to get actual calander month
    .withColumn('month', F.expr("add_months(month, pos)")) \
    # extract year, month and left pad month with 0 to handle 1 digit months (1-9)
    .withColumn('monthid', F.concat(F.year('month'), F.lpad(F.month('month'), 2, '0')))
    #Rename columns
    .withColumnRenamed("col","info") \
    .drop("month", "pos")
   ).show()


+---+----+-------+
| id|info|monthid|
+---+----+-------+
|  1|   -| 201801|
|  1|   -| 201802|
|  1|   X| 201803|
|  1|   X| 201804|
|  1|   -| 201805|
|  1|   -| 201806|
|  1|   -| 201807|
|  1|   -| 201808|
|  1|   -| 201809|
|  1|   -| 201810|
|  1|   -| 201811|
|  1|   -| 201812|
|  1|   -| 201901|
|  1|   X| 201902|
|  2|   X| 201801|
|  2|   X| 201802|
|  2|   -| 201803|
|  2|   -| 201804|
|  2|   X| 201805|
+---+----+-------+