打开的文件过多 AWS Glue 作业

Too many open files AWS Glue Jobs

我有以下作业脚本,当有大量文件要处理时会抛出错误。

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'ENVIRONMENT', 'WORK_BUCKET_NAME', 'OUTPUT_BUCKET_NAME'])

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
env = args['ENVIRONMENT']
work_bucket_name = args['WORK_BUCKET_NAME']
output_bucket_name = args['OUTPUT_BUCKET_NAME']

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = f"{env}_raw_edocs", table_name = "esocial_s_2200", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("esocial", "string", "esocial", "string"), ("tenant", "string", "tenant", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

unbox4 = Unbox.apply(frame = dropnullfields3, path = "esocial", format = "json")

relationalize5 = Relationalize.apply(frame = unbox4, staging_path = f"s3://{work_bucket_name}/{env}/edocs/relationalize-temp/esocial_s_2200", name = "root", transformation_ctx = "relationalize5")

if len(relationalize5.select('root').toDF().schema) > 0:
    datasink8 = glueContext.write_dynamic_frame.from_options(frame = relationalize5.select('root'), connection_type = "s3", connection_options = {"path": f"s3://{output_bucket_name}/{env}/anonymous/edocs/esocial_s-2200", "partitionKeys": ["tenant", "year", "month", "day"]}, format = "parquet", transformation_ctx = "datasink8")
    job.commit()

堆栈错误为:

文件“/tmp/raw_edocs_s_2200.py”,第 55 行,位于 relationalize5 = Relationalize.apply(frame = unbox4, staging_path = f"s3://{work_bucket_name}/{env}/edocs/relationalize-temp/esocial_s_2200", name = "root", transformation_ctx = "关系化5") 文件“/opt/amazon/lib/python3.6/site-packages/awsglue/transforms/transform.py”,第 24 行,在应用中 return 变换(*args,**kwargs) 文件“/opt/amazon/lib/python3.6/site-packages/awsglue/transforms/relationalize.py”,第 47 行,在 call 中 return frame.relationalize(名称,staging_path,选项,transformation_ctx,信息,stageThreshold,totalThreshold) 关系化中的文件“/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py”,第 344 行 长(stageThreshold),长(totalThreshold))) 文件“/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”,第 1257 行,在 call 中 回答,self.gateway_client、self.target_id、self.name) 文件“/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”,第 63 行,deco return f(*a, **kw) 文件“/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”,第 328 行,在 get_return_value 中 格式(target_id,“。”,名称),值) py4j.protocol.Py4JJavaError:调用 o97.relationalize 时发生错误。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 3190 失败了 4 次,最近的失败:阶段 1.0 中的任务 3190.3 丢失(TID 9056、172.36.129.80,执行者 1):java.io.FileNotFoundException : /tmp/blockmgr-5e470d53-7285-469b-8eb2-5e1c9b43e02c/1e/rdd_1039_3190(打开的文件太多)

我的工作配置如下:

  GlueS2200RawJob:
    Type: AWS::Glue::Job
    Properties:
      Command:
        Name: "glueetl"
        PythonVersion: 3
        ScriptLocation: !Sub s3://${WorkBucketName}/${Environment}/anonymous/glue_jobs/raw_edocs_s_2200.py
      DefaultArguments:
        "--job-bookmark-option": "job-bookmark-enable"
        "--ENVIRONMENT": !Ref Environment
        "--WORK_BUCKET_NAME": !Ref WorkBucketName
        "--OUTPUT_BUCKET_NAME": !Ref OutputBucketName
      GlueVersion: "2.0"
      Name: !Sub ${Environment}_raw_edocs_s_2200
      NumberOfWorkers: 2
      Role: !Ref GlueS22
      Tags:
        env: !Ref Environment

      Tags:
        env: !Ref Environment

有谁知道可以帮助解决这个问题的方法吗?

为了解决这个问题,我需要合并数据框并减少分区数量。

cores = int(sc.getConf().get('spark.executor.cores'))
instances = int(sc.getConf().get('spark.executor.instances'))
max_partitions = 200
coalesced_df = unbox4.toDF().coalesce(max(cores * instances, max_partitions))
coalesced5 = DynamicFrame.fromDF(coalesced_df, glue_context, 'coalesced5')