AWS Glue 无法写入镶木地板,内存不足

AWS Glue fail to write parquet, out of memory

我认为 AWS Glue 在写入 parquet 输出失败后 运行 内存不足...

An error occurred while calling o126.parquet. Job aborted due to stage failure: Task 82 in stage 9.0 failed 4 times, most recent failure: Lost task 82.3 in stage 9.0 (TID 17400, ip-172-31-8-70.ap-southeast-1.compute.internal, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

下面有更完整的日志

Traceback (most recent call last): File "script_2019-01-29-06-53-53.py", line 71, in .parquet("s3://.../flights2") File "/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 691, in parquet File "/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.parquet. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply$mcV$sp(FileFormatWriter.scala:213) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:166) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:166) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:508) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 82 in stage 9.0 failed 4 times, most recent failure: Lost task 82.3 in stage 9.0 (TID 17400, ip-172-31-8-70.ap-southeast-1.compute.internal, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1505) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1504) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676) at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply$mcV$sp(FileFormatWriter.scala:186)

失败的行似乎是:

.parquet("s3://pinfare-glue/flights2")

我的 Glue 作业如下所示:有什么办法可以解决这个问题吗?我正在考虑从 S3 中删除一些文件夹,以便 Glue 批量处理数据……但这不可扩展……

另一件事是,我可能会为每个日期创建一个数据框,然后循环写入这些较小的分区……但这会很慢吗?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import regexp_replace, to_timestamp

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print(">>> READING ...")
inputGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "flights", transformation_ctx="inputGDF")
# inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://pinfare-actuary-storage-csv"], "recurse": True}, format = "csv", format_options = {"withHeader": True}, transformation_ctx="inputGDF")
print(">>> DONE READ ...")

flightsDf = inputGDF.toDF()
if bool(flightsDf.head(1)):
    df = flightsDf \
        .drop("createdat") \
        .drop("updatedat") \
        .withColumn("agent", flightsDf["agent"].cast("int")) \
        .withColumn("querydestinationplace", flightsDf["querydestinationplace"].cast("int")) \
        .withColumn("querydatetime", regexp_replace(flightsDf["querydatetime"], "-", "").cast("int")) \
        .withColumn("queryoutbounddate", regexp_replace(flightsDf["queryoutbounddate"], "-", "").cast("int")) \
        .withColumn("queryinbounddate", regexp_replace(flightsDf["queryinbounddate"], "-", "").cast("int")) \
        .withColumn("outdeparture", to_timestamp(flightsDf["outdeparture"], "yyyy-MM-ddTHH:mm:ss")) \
        .withColumn("outarrival", to_timestamp(flightsDf["outarrival"], "yyyy-MM-ddTHH:mm:ss")) \
        .withColumn("indeparture", to_timestamp(flightsDf["indeparture"], "yyyy-MM-ddTHH:mm:ss")) \
        .withColumn("inarrival", to_timestamp(flightsDf["inarrival"], "yyyy-MM-ddTHH:mm:ss")) \

    df.createOrReplaceTempView("flights")

    airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "airports")
    airportsDF = airportsGDF.toDF()
    airportsDF.createOrReplaceTempView("airports")

    agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "agents")
    agentsRawDF = agentsGDF.toDF()
    agentsRawDF.createOrReplaceTempView("agents_raw")

    agentsDF = spark.sql("""
        SELECT id, name, type FROM agents_raw
        WHERE type IN ('Airline', 'TravelAgent')
    """) 
    agentsDF.createOrReplaceTempView("agents")

    finalDf = spark.sql("""
            SELECT /*+ BROADCAST(agents) */ /*+ BROADCAST(airports) */
                f.*, countryName, cityName, airportName, a.name AS agentName,
                CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key
            FROM flights f
            LEFT JOIN agents a
            ON f.agent = a.id
            LEFT JOIN airports p
            ON f.querydestinationplace = p.airportId
        """)
    print(">>> DONE PROCESS FLIGHTS")

    print("Writing ...")
    finalDf \
      .write \
      .mode("append") \
      .partitionBy(["countryName", "querydatetime"]) \
      .parquet("s3://.../flights2")
else:
    print("Nothing to write ...")

job.commit()

import boto3
glue_client = boto3.client('glue', region_name='ap-southeast-1')
glue_client.start_crawler(Name='...')

如果您的 LEFT JOIN 具有 1:N 映射,它将导致 DF 中的指数级大行,这可能会导致 OOM。在 glue 中,没有设置您自己的基础设施配置的规定,例如每个 vCPU 64GB 内存。如果是这种情况,首先尝试使用 spark.yarn.executor.memoryOverhead 选项 or/and 增加 DPU。否则,您必须使用下推谓词对数据进行存储,然后 运行 在 for 循环中遍历所有数据