如何使用 pySpark 使多个 json 处理速度更快?
How to make multiple json processing faster using pySpark?
我在 Databricks 中有一个 json 文件列表,我想做的是阅读每个文件json,提取所需的值,然后将其附加到一个空的 pandas 数据框中。每个 json 文件对应于最终数据帧上的一行。初始 json 文件列表长度为 50k。到目前为止,我构建的是下面的函数,它完美地完成了工作,但它花费了太多时间,以至于我将 json 文件列表子集 5k 个箱子和 运行 个箱子分别分开。每个需要30分钟。我仅限于在 Databricks 中使用 3 节点集群 。
你们有机会提高我的功能的效率吗?提前致谢。
### Create a big dataframe including all json files ###
def jsons_to_pdf(all_paths):
# Create an empty pandas dataframes (it is defined only with column names)
pdf = create_initial_pdf(samplefile)
# Append each row into the above dataframe
for path in all_paths:
# Create a spark dataframe
sdf = sqlContext.read.json(path)
# Create a two extracted lists of values
init_values = sdf.select("id","logTimestamp","otherTimestamp").rdd.flatMap(lambda x: x).collect()
id_values = sdf.select(sdf["dataPoints"]["value"]).rdd.flatMap(lambda x: x).collect()[0]
#Append the concatenated list each one as a row into the initial dataframe
pdf.loc[len(pdf)] = init_values + id_values
return pdf
一个 json 文件如下所示:
而我想要实现的是将 dataPoints['id'] 作为新列,并将 dataPoints['value'] 作为它们的值,以便最终变成这样:
根据您的示例,您想要执行的是一个数据透视表,然后将您的数据转换为 pandas 数据框。
步骤是:
- 将所有 json 收集到 1 个大数据框中,
- 旋转你的数据,
- 将它们转换为 pandas 数据帧
尝试这样的事情:
from functools import reduce
def jsons_to_pdf(all_paths):
# Create a big dataframe from all the jsons
sdf = reduce(
lambda a,b : a.union(b),
[
sqlContext.read.json(path)
for path
in all_paths
]
)
# select and pivot your data
pivot_df = sdf.select(
"imoNo",
"logTimestamp",
"payloadTimestamp",
F.explode("datapoints").alias("datapoint")
).groupBy(
"imoNo",
"logTimestamp",
"payloadTimestamp",
).pivot(
"datapoint.id"
).sum("datapoint.value")
# convert to a pandas dataframe
pdf = pivot_df.toPandas()
return pdf
根据您的评论,您可以将文件列表 all_paths
替换为通用路径并更改创建方式 sdf
:
all_paths = 'abc/*/*/*' # 3x*, one for year, one for month, one for day
def jsons_to_pdf(all_paths):
# Create a big dataframe from all the jsons
sdf = sqlContext.read.json(path)
这肯定会提高性能。
我在 Databricks 中有一个 json 文件列表,我想做的是阅读每个文件json,提取所需的值,然后将其附加到一个空的 pandas 数据框中。每个 json 文件对应于最终数据帧上的一行。初始 json 文件列表长度为 50k。到目前为止,我构建的是下面的函数,它完美地完成了工作,但它花费了太多时间,以至于我将 json 文件列表子集 5k 个箱子和 运行 个箱子分别分开。每个需要30分钟。我仅限于在 Databricks 中使用 3 节点集群 。
你们有机会提高我的功能的效率吗?提前致谢。
### Create a big dataframe including all json files ###
def jsons_to_pdf(all_paths):
# Create an empty pandas dataframes (it is defined only with column names)
pdf = create_initial_pdf(samplefile)
# Append each row into the above dataframe
for path in all_paths:
# Create a spark dataframe
sdf = sqlContext.read.json(path)
# Create a two extracted lists of values
init_values = sdf.select("id","logTimestamp","otherTimestamp").rdd.flatMap(lambda x: x).collect()
id_values = sdf.select(sdf["dataPoints"]["value"]).rdd.flatMap(lambda x: x).collect()[0]
#Append the concatenated list each one as a row into the initial dataframe
pdf.loc[len(pdf)] = init_values + id_values
return pdf
一个 json 文件如下所示:
而我想要实现的是将 dataPoints['id'] 作为新列,并将 dataPoints['value'] 作为它们的值,以便最终变成这样:
根据您的示例,您想要执行的是一个数据透视表,然后将您的数据转换为 pandas 数据框。
步骤是:
- 将所有 json 收集到 1 个大数据框中,
- 旋转你的数据,
- 将它们转换为 pandas 数据帧
尝试这样的事情:
from functools import reduce
def jsons_to_pdf(all_paths):
# Create a big dataframe from all the jsons
sdf = reduce(
lambda a,b : a.union(b),
[
sqlContext.read.json(path)
for path
in all_paths
]
)
# select and pivot your data
pivot_df = sdf.select(
"imoNo",
"logTimestamp",
"payloadTimestamp",
F.explode("datapoints").alias("datapoint")
).groupBy(
"imoNo",
"logTimestamp",
"payloadTimestamp",
).pivot(
"datapoint.id"
).sum("datapoint.value")
# convert to a pandas dataframe
pdf = pivot_df.toPandas()
return pdf
根据您的评论,您可以将文件列表 all_paths
替换为通用路径并更改创建方式 sdf
:
all_paths = 'abc/*/*/*' # 3x*, one for year, one for month, one for day
def jsons_to_pdf(all_paths):
# Create a big dataframe from all the jsons
sdf = sqlContext.read.json(path)
这肯定会提高性能。