如何更改地图数据类型中的值
How to change value in a Map Datatype
我有一个数据框,其中有一列类型为 MapType<StringType, StringType>
。
|-- identity: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
标识列包含键“更新”。
+-------------+
identity |
+-------+-----+
[update -> Y]|
[update -> Y]|
[update -> Y]|
[update -> Y]|
+-------+-----+
如何将键“更新”的值从“Y”更改为“N”?
我使用的是 2.3 版的 spark
如有任何帮助,我们将不胜感激。谢谢!
据我所知,在 spark 2.3 中没有处理地图的内置函数。唯一的办法大概就是设计一个UDF:
val df = Seq(Map(1 -> 2, 3 -> 4), Map(7 -> 8, 1 -> 6)).toDF("m")
// a function that sets the value "new" to all key equal to "1"
val fun = udf((m : Map[String, String]) =>
m.map{ case (key, value) => (key, if (key == "1") "new" else value) }
)
df.withColumn("m", fun('m)).show(false)
+------------------+
|m |
+------------------+
|{1 -> new, 3 -> 4}|
|{7 -> 8, 1 -> new}|
+------------------+
JSON解法
一种替代方法是展开地图,进行更新并重新聚合。不幸的是,在 spark 2.3 中无法根据动态数量的项目创建地图。但是,您可以将地图聚合为 json 字典,然后使用 from_json
函数。我很确定第一个解决方案会更有效率,但谁知道呢。不过在 pyspark 中,此解决方案可能比 UDF 更快。
df
.withColumn("id", monotonically_increasing_id)
.select($"id", explode('m))
.withColumn("value", when('key === "1" ,lit("new")).otherwise('value))
.withColumn("entry", concat(lit("\""), 'key, lit("\" : \""), 'value, lit("\"")))
.groupBy("id").agg( collect_list('entry) as "list")
.withColumn("json", concat(lit("{"), concat_ws(",", 'list), lit("}")))
.withColumn("m", from_json('json, MapType(StringType, StringType)))
.show(false)
结果与之前相同。
我有一个数据框,其中有一列类型为 MapType<StringType, StringType>
。
|-- identity: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
标识列包含键“更新”。
+-------------+
identity |
+-------+-----+
[update -> Y]|
[update -> Y]|
[update -> Y]|
[update -> Y]|
+-------+-----+
如何将键“更新”的值从“Y”更改为“N”?
我使用的是 2.3 版的 spark
如有任何帮助,我们将不胜感激。谢谢!
据我所知,在 spark 2.3 中没有处理地图的内置函数。唯一的办法大概就是设计一个UDF:
val df = Seq(Map(1 -> 2, 3 -> 4), Map(7 -> 8, 1 -> 6)).toDF("m")
// a function that sets the value "new" to all key equal to "1"
val fun = udf((m : Map[String, String]) =>
m.map{ case (key, value) => (key, if (key == "1") "new" else value) }
)
df.withColumn("m", fun('m)).show(false)
+------------------+
|m |
+------------------+
|{1 -> new, 3 -> 4}|
|{7 -> 8, 1 -> new}|
+------------------+
JSON解法
一种替代方法是展开地图,进行更新并重新聚合。不幸的是,在 spark 2.3 中无法根据动态数量的项目创建地图。但是,您可以将地图聚合为 json 字典,然后使用 from_json
函数。我很确定第一个解决方案会更有效率,但谁知道呢。不过在 pyspark 中,此解决方案可能比 UDF 更快。
df
.withColumn("id", monotonically_increasing_id)
.select($"id", explode('m))
.withColumn("value", when('key === "1" ,lit("new")).otherwise('value))
.withColumn("entry", concat(lit("\""), 'key, lit("\" : \""), 'value, lit("\"")))
.groupBy("id").agg( collect_list('entry) as "list")
.withColumn("json", concat(lit("{"), concat_ws(",", 'list), lit("}")))
.withColumn("m", from_json('json, MapType(StringType, StringType)))
.show(false)
结果与之前相同。