将 pyspark 数据框转换为字典:结果与预期不同
Converting pyspark dataframe into dictionary: result different than expected
假设我有以下 pyspark 数据框:
data = [("USA",20,40,60),
("India",50,40,30),
("Nepal",20,50,30),
("Ireland",40,60,70),
("Norway",50,50,60)
]
columns = ["country", "A", "B", "C"]
df = spark.createDataFrame(data=data,schema=columns)
为了从中创建字典,我采用了以下方法:
import pyspark.sql.functions as F
list_test = [row.asDict() for row in df.collect()]
dict_test = {country['country']: country for country in list_test}
结果如下:
{'USA': {'country': 'USA', 'A': 20, 'B': 40, 'C': 60}, 'India': {'country': 'India', 'A': 50, 'B': 40, 'C': 30}, 'Nepal': {'country': 'Nepal', 'A': 20, 'B': 50, 'C': 30}, 'Ireland': {'country': 'Ireland', 'A': 40, 'B': 60, 'C': 70}, 'Norway': {'country': 'Norway', 'A': 50, 'B': 50, 'C': 60}}
然而,我想要的是:
{'USA': {'A': 20, 'B': 40, 'C': 60}, 'India': {'A': 50, 'B': 40, 'C': 30}, 'Nepal': {'A': 20, 'B': 50, 'C': 30}, 'Ireland': {'A': 40, 'B': 60, 'C': 70}, 'Norway': {'A': 50, 'B': 50, 'C': 60}}
我怎样才能得到这个?我不确定我是否理解我做错了什么。
您可以进行听写理解以删除不需要的项目:
list_test = [row.asDict() for row in df.collect()]
dict_test = {country['country']: {k:v for k,v in country.items() if k != 'country'} for country in list_test}
print(dict_test)
{'USA': {'A': 20, 'B': 40, 'C': 60}, 'India': {'A': 50, 'B': 40, 'C': 30}, 'Nepal': {'A': 20, 'B': 50, 'C': 30}, 'Ireland': {'A': 40, 'B': 60, 'C': 70}, 'Norway': {'A': 50, 'B': 50, 'C': 60}}
这是另一种方法,在一些转换后直接从 DataFrame 中收集 json 字符串,然后使用 json.loads
获取字典对象:
import json
from pyspark.sql.functions import to_json, collect_list, struct, map_from_arrays
dict_test = json.loads(
df.groupBy().agg(
collect_list("country").alias("countries"),
collect_list(struct("A", "B", "C")).alias("values")
).select(
to_json(map_from_arrays("countries", "values")).alias("json_str")
).collect()[0].json_str
)
print(dict_test)
#{'USA': {'A': 20, 'B': 40, 'C': 60}, 'India': {'A': 50, 'B': 40, 'C': 30}, 'Nepal': {'A': 20, 'B': 50, 'C': 30}, 'Ireland': {'A': 40, 'B': 60, 'C': 70}, 'Norway': {'A': 50, 'B': 50, 'C': 60}}
假设我有以下 pyspark 数据框:
data = [("USA",20,40,60),
("India",50,40,30),
("Nepal",20,50,30),
("Ireland",40,60,70),
("Norway",50,50,60)
]
columns = ["country", "A", "B", "C"]
df = spark.createDataFrame(data=data,schema=columns)
为了从中创建字典,我采用了以下方法:
import pyspark.sql.functions as F
list_test = [row.asDict() for row in df.collect()]
dict_test = {country['country']: country for country in list_test}
结果如下:
{'USA': {'country': 'USA', 'A': 20, 'B': 40, 'C': 60}, 'India': {'country': 'India', 'A': 50, 'B': 40, 'C': 30}, 'Nepal': {'country': 'Nepal', 'A': 20, 'B': 50, 'C': 30}, 'Ireland': {'country': 'Ireland', 'A': 40, 'B': 60, 'C': 70}, 'Norway': {'country': 'Norway', 'A': 50, 'B': 50, 'C': 60}}
然而,我想要的是:
{'USA': {'A': 20, 'B': 40, 'C': 60}, 'India': {'A': 50, 'B': 40, 'C': 30}, 'Nepal': {'A': 20, 'B': 50, 'C': 30}, 'Ireland': {'A': 40, 'B': 60, 'C': 70}, 'Norway': {'A': 50, 'B': 50, 'C': 60}}
我怎样才能得到这个?我不确定我是否理解我做错了什么。
您可以进行听写理解以删除不需要的项目:
list_test = [row.asDict() for row in df.collect()]
dict_test = {country['country']: {k:v for k,v in country.items() if k != 'country'} for country in list_test}
print(dict_test)
{'USA': {'A': 20, 'B': 40, 'C': 60}, 'India': {'A': 50, 'B': 40, 'C': 30}, 'Nepal': {'A': 20, 'B': 50, 'C': 30}, 'Ireland': {'A': 40, 'B': 60, 'C': 70}, 'Norway': {'A': 50, 'B': 50, 'C': 60}}
这是另一种方法,在一些转换后直接从 DataFrame 中收集 json 字符串,然后使用 json.loads
获取字典对象:
import json
from pyspark.sql.functions import to_json, collect_list, struct, map_from_arrays
dict_test = json.loads(
df.groupBy().agg(
collect_list("country").alias("countries"),
collect_list(struct("A", "B", "C")).alias("values")
).select(
to_json(map_from_arrays("countries", "values")).alias("json_str")
).collect()[0].json_str
)
print(dict_test)
#{'USA': {'A': 20, 'B': 40, 'C': 60}, 'India': {'A': 50, 'B': 40, 'C': 30}, 'Nepal': {'A': 20, 'B': 50, 'C': 30}, 'Ireland': {'A': 40, 'B': 60, 'C': 70}, 'Norway': {'A': 50, 'B': 50, 'C': 60}}