在将大型 JSON 文件转换为 JSON 之前,如何使用 AWS glueContext 拆分/分块文件?

How do I split / chunk Large JSON Files with AWS glueContext before converting them to JSON?

我正在尝试使用 AWS Glue 将 20GB JSON gzip 文件转换为 parquet。

我已经使用 Pyspark 和下面的代码设置了一个作业。

我收到这条日志警告消息:

LOG.WARN: Loading one large unsplittable file s3://aws-glue-data.json.gz with only one partition, because the file is compressed by unsplittable compression codec.

我想知道是否有办法拆分/分块文件?我知道我可以用 pandas 做到这一点,但不幸的是,这需要太长时间(12 小时以上)。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
import pyspark.sql.functions
from pyspark.sql.functions import col, concat, reverse, translate
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

test = glueContext.create_dynamic_frame_from_catalog(
             database="test_db",
             table_name="aws-glue-test_table")


# Create Spark DataFrame, remove timestamp field and re-name other fields
reconfigure = test.drop_fields(['timestamp']).rename_field('name', 'FirstName').rename_field('LName', 'LastName').rename_field('type', 'record_type')

# Create pyspark DF
spark_df = reconfigure.toDF()
# Filter and only return 'a' record types 
spark_df = spark_df.where("record_type == 'a'")
# Once filtered, remove the record_type column
spark_df = spark_df.drop('record_type')
spark_df = spark_df.withColumn("LastName", translate("LastName", "LName:", ""))
spark_df = spark_df.withColumn("FirstName", reverse("FirstName"))

spark_df.write.parquet("s3a://aws-glue-bucket/parquet/test.parquet")

Spark 不会并行读取单个 gzip 文件。但是,您可以将其分成块。

此外,Spark 在读取 gzip 文件时真的很慢(因为它没有并行化)。您可以这样做来加快速度:

file_names_rdd = sc.parallelize(list_of_files, 100)
lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())