加快 pyspark 解析大型嵌套 json 文件
Speed up pyspark parsing large nested json file
你好,我嵌套了 json 大小为 400 兆字节和 200k 的文件 records.I 创建了一个使用 pyspark 解析文件并存储在自定义数据框中的解决方案,但它需要大约 5-7分钟来完成这个非常慢的操作。
这是一个 json 文件的示例(小文件但与大文件具有相同的结构):
{"status":"success",
"data":{"resultType":"matrix","result":
[{"metric":{"data0":"T" ,"data1":"O"},"values":[[90,"0"],[80, "0"]]},
{"metric":{"data0":"K" ,"data1":"S"},"values":[[70,"0"],[60, "0"]]},
{"metric":{"data2":"J" ,"data3":"O"},"values":[[50,"0"],[40, "0"]]}]}}
这是我想要的输出数据帧的结构:
time | value |data0 | data1 | data2 | data3
90 | "0" | "T"| "O"| nan | nan
80 | "0" | "T"| "O"| nan | nan
70 | "0" | "K"| "S"| nan | nan
60 | "0" | "K"| "S"| nan | nan
50 | "0" | nan| nan| "J" | "O"
40 | "0" | nan| nan| "J" | "O"
这是我用来在大文件上生成上面列出的数据帧结构的 pyspark 代码:
from datetime import datetime
import json
import rapidjson
import pyspark.sql.functions as F
from pyspark.sql.types import StructType
from util import schema ,meta_date
new_schema = StructType.fromJson(json.loads(schema))
with open("largefile.json", "r") as json_file:
result_count = len(rapidjson.load(json_file)["data"]["result"])
spark = SparkSession.builder.master("spark://IP").getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'),
('spark.executor.cores', '4'),
('spark.driver.memory', '4g'),
])
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("largefile.json")
for data_name in meta_date:
df = df.withColumn(
data_name, F.expr(f"transform(data.result, x -> x.metric.{data_name})")
)
df = (
df.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(*[F.lit(x) for x in range(0, result_count)]))
.withColumn("items", F.explode(F.col("items")))
)
for data_name in meta_date:
df = df.withColumn(data_name, F.col(data_name).getItem(F.col("items")))
df = (df.withColumn("values", F.col("values").getItem(F.col("items")))
.withColumn("values", F.explode("values"))
.withColumn("time", F.col("values").getItem(0))
.withColumn("value", F.col("values").getItem(1))
.drop("data", "status", "items", "values")).show()
我的机器有 4 个核心(8 个逻辑核心)和内存 16 gb。我正在使用具有主节点和 2 个工作节点集群的独立模式。
关于如何通过编辑集群配置或重构代码中的转换来加快此过程的任何帮助?
这个呢?阅读 json、select 列并展开,看起来与您想要的结果相匹配。
df.select(f.explode('data.result').alias('result')) \
.select('result.metric.*', f.explode('result.values').alias('values')) \
.withColumn('time', f.col('values')[0]) \
.withColumn('value', f.col('values')[1]) \
.drop('values') \
.show(truncate=False)
+-----+-----+-----+-----+----+-----+
|data0|data1|data2|data3|time|value|
+-----+-----+-----+-----+----+-----+
|T |O |null |null |90 |0 |
|T |O |null |null |80 |0 |
|K |S |null |null |70 |0 |
|K |S |null |null |60 |0 |
|null |null |J |O |50 |0 |
|null |null |J |O |40 |0 |
+-----+-----+-----+-----+----+-----+
你好,我嵌套了 json 大小为 400 兆字节和 200k 的文件 records.I 创建了一个使用 pyspark 解析文件并存储在自定义数据框中的解决方案,但它需要大约 5-7分钟来完成这个非常慢的操作。
这是一个 json 文件的示例(小文件但与大文件具有相同的结构):
{"status":"success",
"data":{"resultType":"matrix","result":
[{"metric":{"data0":"T" ,"data1":"O"},"values":[[90,"0"],[80, "0"]]},
{"metric":{"data0":"K" ,"data1":"S"},"values":[[70,"0"],[60, "0"]]},
{"metric":{"data2":"J" ,"data3":"O"},"values":[[50,"0"],[40, "0"]]}]}}
这是我想要的输出数据帧的结构:
time | value |data0 | data1 | data2 | data3
90 | "0" | "T"| "O"| nan | nan
80 | "0" | "T"| "O"| nan | nan
70 | "0" | "K"| "S"| nan | nan
60 | "0" | "K"| "S"| nan | nan
50 | "0" | nan| nan| "J" | "O"
40 | "0" | nan| nan| "J" | "O"
这是我用来在大文件上生成上面列出的数据帧结构的 pyspark 代码:
from datetime import datetime
import json
import rapidjson
import pyspark.sql.functions as F
from pyspark.sql.types import StructType
from util import schema ,meta_date
new_schema = StructType.fromJson(json.loads(schema))
with open("largefile.json", "r") as json_file:
result_count = len(rapidjson.load(json_file)["data"]["result"])
spark = SparkSession.builder.master("spark://IP").getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'),
('spark.executor.cores', '4'),
('spark.driver.memory', '4g'),
])
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("largefile.json")
for data_name in meta_date:
df = df.withColumn(
data_name, F.expr(f"transform(data.result, x -> x.metric.{data_name})")
)
df = (
df.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(*[F.lit(x) for x in range(0, result_count)]))
.withColumn("items", F.explode(F.col("items")))
)
for data_name in meta_date:
df = df.withColumn(data_name, F.col(data_name).getItem(F.col("items")))
df = (df.withColumn("values", F.col("values").getItem(F.col("items")))
.withColumn("values", F.explode("values"))
.withColumn("time", F.col("values").getItem(0))
.withColumn("value", F.col("values").getItem(1))
.drop("data", "status", "items", "values")).show()
我的机器有 4 个核心(8 个逻辑核心)和内存 16 gb。我正在使用具有主节点和 2 个工作节点集群的独立模式。
关于如何通过编辑集群配置或重构代码中的转换来加快此过程的任何帮助?
这个呢?阅读 json、select 列并展开,看起来与您想要的结果相匹配。
df.select(f.explode('data.result').alias('result')) \
.select('result.metric.*', f.explode('result.values').alias('values')) \
.withColumn('time', f.col('values')[0]) \
.withColumn('value', f.col('values')[1]) \
.drop('values') \
.show(truncate=False)
+-----+-----+-----+-----+----+-----+
|data0|data1|data2|data3|time|value|
+-----+-----+-----+-----+----+-----+
|T |O |null |null |90 |0 |
|T |O |null |null |80 |0 |
|K |S |null |null |70 |0 |
|K |S |null |null |60 |0 |
|null |null |J |O |50 |0 |
|null |null |J |O |40 |0 |
+-----+-----+-----+-----+----+-----+