pyspark:将字典数组转换为新列
pyspark: turn array of dict to new columns
我正在努力转换我的 pyspark 数据框,它看起来像这样:
df = spark.createDataFrame([('0018aad4',[300, 450], ['{"v1": "blue"}', '{"v2": "red"}']), ('0018aad5',[300], ['{"v1": "blue"}'])],[ "id","Tlist", 'Tstring'])
df.show(2, False)
+--------+----------+-------------------------------+
|id |Tlist |Tstring |
+--------+----------+-------------------------------+
|0018aad4|[300, 450]|[{"v1": "blue"}, {"v2": "red"}]|
|0018aad5|[300] |[{"v1": "blue"}] |
+--------+----------+-------------------------------+
对此:
df_result = spark.createDataFrame([('0018aad4',[300, 450], 'blue', 'red'), ('0018aad5',[300], 'blue', None)],[ "id","Tlist", 'v1', 'v2'])
df_result.show(2, False)
+--------+----------+----+----+
|id |Tlist |v1 |v2 |
+--------+----------+----+----+
|0018aad4|[300, 450]|blue|red |
|0018aad5|[300] |blue|null|
+--------+----------+----+----+
我尝试了旋转和其他一些东西,但没有得到上面的结果。
请注意,我在 Tstring
列中没有确切的字典数
你知道我该怎么做吗?
使用transform
函数你可以将数组的每个元素转换成一个映射类型。之后,您可以使用 aggregate
函数获取一张地图,将其分解然后旋转键以获得所需的输出:
from pyspark.sql import functions as F
df1 = df.withColumn(
"Tstring",
F.transform("Tstring", lambda x: F.from_json(x, "map<string,string>"))
).withColumn(
"Tstring",
F.aggregate(
F.expr("slice(Tstring, 2, size(Tstring))"),
F.col("Tstring")[0],
lambda acc, x: F.map_concat(acc, x)
)
).select(
"id", "Tlist", F.explode("Tstring")
).groupby(
"id", "Tlist"
).pivot("key").agg(F.first("value"))
df1.show()
#+--------+----------+----+----+
#|id |Tlist |v1 |v2 |
#+--------+----------+----+----+
#|0018aad4|[300, 450]|blue|red |
#|0018aad5|[300] |blue|null|
#+--------+----------+----+----+
我使用的是 Spark 3.1+,因此 higher-order 函数(例如 transform
可以在数据帧 API 中使用,但您可以使用 expr
执行相同的操作火花 <3.1:
df1 = (df.withColumn("Tstring", F.expr("transform(Tstring, x-> from_json(x, 'map<string,string>'))"))
.withColumn("Tstring", F.expr("aggregate(slice(Tstring, 2, size(Tstring)), Tstring[0], (acc, x) -> map_concat(acc, x))"))
.select("id", "Tlist", F.explode("Tstring"))
.groupby("id", "Tlist")
.pivot("key")
.agg(F.first("value"))
)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import *
from collections import *
from pyspark.sql.functions import udf,explode
from pyspark.sql.types import StringType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)
df= spark.createDataFrame(
[
('0018aad4', [{"val1":"blue", "val2":"red"}],[300,500]),
('0018aad', [{"val1":"blue", "val2":"null"}],[300])
],("ID","List","Tlist")
)
df2 = df.select(df.ID,explode(df.List).alias("Dict"),df.Tlist )
df2.withColumn("Val1", F.col("Dict").getItem("val1")).withColumn("Val2", F.col("Dict").getItem("val2")).show(truncate=False)
+--------+----------------------------+----------+----+----+
|ID |Dict |Tlist |Val1|Val2|
+--------+----------------------------+----------+----+----+
|0018aad4|{val2 -> red, val1 -> blue} |[300, 500]|blue|red |
|0018aad |{val2 -> null, val1 -> blue}|[300] |blue|null|
+--------+----------------------------+------ ----+----+----+
这就是您要找的。
稍微 over-fitting 这个例子(您可能需要调整它以进行任何概括),您可以使用它们的索引从 Tstring
列中获取元素:
partial_results = df.withColumn("v1", df.Tstring[0]).withColumn("v2", df.Tstring[1])
+--------+----------+--------------+-------------+
| id| Tlist| v1| v2|
+--------+----------+--------------+-------------+
|0018aad4|[300, 450]|{"v1": "blue"}|{"v2": "red"}|
|0018aad5| [300]|{"v1": "blue"}| null|
+--------+----------+--------------+-------------+
有了这个你可以做一些清洁来达到想要的结果
from pyspark.sql.functions import regexp_replace
maximum_color_length = 100
wanted_df = df.withColumn(
"v1",
regexp_replace(df.Tstring[0].substr(9, maximum_color_length), r"\"\}", "")
).withColumn(
"v2",
regexp_replace(df.Tstring[1].substr(9, maximum_color_length), r"\"\}", "")
).drop(
"Tstring"
)
+--------+----------+----+----+
| id| Tlist| v1| v2|
+--------+----------+----+----+
|0018aad4|[300, 450]|blue| red|
|0018aad5| [300]|blue|null|
+--------+----------+----+----+
我正在努力转换我的 pyspark 数据框,它看起来像这样:
df = spark.createDataFrame([('0018aad4',[300, 450], ['{"v1": "blue"}', '{"v2": "red"}']), ('0018aad5',[300], ['{"v1": "blue"}'])],[ "id","Tlist", 'Tstring'])
df.show(2, False)
+--------+----------+-------------------------------+
|id |Tlist |Tstring |
+--------+----------+-------------------------------+
|0018aad4|[300, 450]|[{"v1": "blue"}, {"v2": "red"}]|
|0018aad5|[300] |[{"v1": "blue"}] |
+--------+----------+-------------------------------+
对此:
df_result = spark.createDataFrame([('0018aad4',[300, 450], 'blue', 'red'), ('0018aad5',[300], 'blue', None)],[ "id","Tlist", 'v1', 'v2'])
df_result.show(2, False)
+--------+----------+----+----+
|id |Tlist |v1 |v2 |
+--------+----------+----+----+
|0018aad4|[300, 450]|blue|red |
|0018aad5|[300] |blue|null|
+--------+----------+----+----+
我尝试了旋转和其他一些东西,但没有得到上面的结果。
请注意,我在 Tstring
你知道我该怎么做吗?
使用transform
函数你可以将数组的每个元素转换成一个映射类型。之后,您可以使用 aggregate
函数获取一张地图,将其分解然后旋转键以获得所需的输出:
from pyspark.sql import functions as F
df1 = df.withColumn(
"Tstring",
F.transform("Tstring", lambda x: F.from_json(x, "map<string,string>"))
).withColumn(
"Tstring",
F.aggregate(
F.expr("slice(Tstring, 2, size(Tstring))"),
F.col("Tstring")[0],
lambda acc, x: F.map_concat(acc, x)
)
).select(
"id", "Tlist", F.explode("Tstring")
).groupby(
"id", "Tlist"
).pivot("key").agg(F.first("value"))
df1.show()
#+--------+----------+----+----+
#|id |Tlist |v1 |v2 |
#+--------+----------+----+----+
#|0018aad4|[300, 450]|blue|red |
#|0018aad5|[300] |blue|null|
#+--------+----------+----+----+
我使用的是 Spark 3.1+,因此 higher-order 函数(例如 transform
可以在数据帧 API 中使用,但您可以使用 expr
执行相同的操作火花 <3.1:
df1 = (df.withColumn("Tstring", F.expr("transform(Tstring, x-> from_json(x, 'map<string,string>'))"))
.withColumn("Tstring", F.expr("aggregate(slice(Tstring, 2, size(Tstring)), Tstring[0], (acc, x) -> map_concat(acc, x))"))
.select("id", "Tlist", F.explode("Tstring"))
.groupby("id", "Tlist")
.pivot("key")
.agg(F.first("value"))
)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import *
from collections import *
from pyspark.sql.functions import udf,explode
from pyspark.sql.types import StringType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)
df= spark.createDataFrame(
[
('0018aad4', [{"val1":"blue", "val2":"red"}],[300,500]),
('0018aad', [{"val1":"blue", "val2":"null"}],[300])
],("ID","List","Tlist")
)
df2 = df.select(df.ID,explode(df.List).alias("Dict"),df.Tlist )
df2.withColumn("Val1", F.col("Dict").getItem("val1")).withColumn("Val2", F.col("Dict").getItem("val2")).show(truncate=False)
+--------+----------------------------+----------+----+----+
|ID |Dict |Tlist |Val1|Val2|
+--------+----------------------------+----------+----+----+
|0018aad4|{val2 -> red, val1 -> blue} |[300, 500]|blue|red |
|0018aad |{val2 -> null, val1 -> blue}|[300] |blue|null|
+--------+----------------------------+------ ----+----+----+
这就是您要找的。
稍微 over-fitting 这个例子(您可能需要调整它以进行任何概括),您可以使用它们的索引从 Tstring
列中获取元素:
partial_results = df.withColumn("v1", df.Tstring[0]).withColumn("v2", df.Tstring[1])
+--------+----------+--------------+-------------+
| id| Tlist| v1| v2|
+--------+----------+--------------+-------------+
|0018aad4|[300, 450]|{"v1": "blue"}|{"v2": "red"}|
|0018aad5| [300]|{"v1": "blue"}| null|
+--------+----------+--------------+-------------+
有了这个你可以做一些清洁来达到想要的结果
from pyspark.sql.functions import regexp_replace
maximum_color_length = 100
wanted_df = df.withColumn(
"v1",
regexp_replace(df.Tstring[0].substr(9, maximum_color_length), r"\"\}", "")
).withColumn(
"v2",
regexp_replace(df.Tstring[1].substr(9, maximum_color_length), r"\"\}", "")
).drop(
"Tstring"
)
+--------+----------+----+----+
| id| Tlist| v1| v2|
+--------+----------+----+----+
|0018aad4|[300, 450]|blue| red|
|0018aad5| [300]|blue|null|
+--------+----------+----+----+