在 pyspark 中更新嵌套结构中的映射值

Update map value in nested struct in pyspark

我有一个 table 有日期和评论。

dob        | comment
---------------------------
1960-12-01 | this is useful

我想要一个这种类型的新专栏:

value_type = T.StructType(
    [
       T.StructField("extra",T.MapType(T.StringType(), T.StringType(), True), True),
       T.StructField("date", T.StringType(), True),
       T.StructField("from_date", T.StringType(), True),
       T.StructField("to_date", T.StringType(), True),
       T.StructField("value", T.StringType(), True),
    ]
)

我需要:

  1. 将 df.date 放入结构的 date 字段中
  2. 将 df.comment 放入结构的 extra 映射中

感谢 blackbishop,我想出了如何做第一部分 - 我尝试使用 .withField() 更新地图但它抛出错误:

我试过了:

(df
.withColumn("new_col", 
            F.struct(*[F.lit(None).cast(f.dataType).alias(f.name) 
                       for f in value_type.fields]))
.withColumn("new_col", (F.col("new_col")
                         .withField("date", F.col("dob"))
                         .withField("extra.value", F.col("comment")))))

但是我得到以下错误:

AnalysisException: cannot resolve 'update_fields(update_fields(new_col, WithField(dob), WithField(dob)).extra, WithField(dob))' due to data type mismatch: struct argument should be struct type, got: map<string,string>; 

我很困惑为什么它不能与结构内的映射一起使用?

谢谢:)

我想通了!

(df
.withColumn("new_col", 
            F.struct(*[F.lit(None).cast(f.dataType).alias(f.name) 
                       for f in value_type.fields]))
.withColumn("new_col", (F.col("new_col")
                         .withField("date", F.col("dob"))
                         .withField("extra", 
                                    F.create_map(F.lit("my_key"), F.col("comment")))))

问题是我实际上并没有将地图传递给地图类型!