如何处理 AWS Glue 中映射函数中的错误?

How do I handle errors in mapped functions in AWS Glue?

我正在使用 DynamicFrame 的 map 方法(或者,等效地,Map.apply 方法)。我注意到我传递给这些函数的函数中的任何错误都被默默地忽略并导致返回的 DynamicFrame 为空。

假设我有这样的工作脚本:

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *

glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database="radixdemo", table_name="census_csv")

def my_mapper(rec):
    import logging
    logging.error("[RADIX] An error-log from in the mapper!")
    print "[RADIX] from in the mapper!"
    raise Exception("[RADIX] A bug!")
dyF = dyF.map(my_mapper, 'my_mapper')

print "Count:  ", dyF.count()
dyF.printSchema()
dyF.toDF().show()

如果我在我的 Glue Dev Endpoint 中使用 gluepython 运行 这个脚本,我会得到这样的输出:

[glue@ip-172-31-83-196 ~]$ gluepython gluejob.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/aws/glue/etl/jars/glue-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/05/23 20:56:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Count:   0
root

++
||
++
++

关于此输出的注释:

同样,当我将此脚本保存为 AWS Glue 控制台中的作业并 运行 时,该作业未指示发生任何错误——作业状态为 "Succeeded"。值得注意的是,我 do 得到 print 语句和 logging.error 调用输出到作业日志,但仅在常规 "Logs" 中,而不是 "Error Logs".

我想要的是能够指示我的作业失败了,并且能够很容易地找到这些错误日志。最重要的是只是表明它失败了。

有没有一种方法可以在映射函数中记录错误,以便 Glue 将其作为 "Error Log"(并将其放在单独的 AWS CloudWatch Logs 路径中)?如果发生这种情况,它会自动将整个作业标记为失败吗?或者是否有其他方法可以从映射函数中明确地使作业失败?

(我的计划是,如果有记录错误的方法 and/or 将作业标记为失败,我的计划是创建一个装饰器或其他实用函数,它们将自动捕获映射函数中的异常并确保它们被记录并标记为失败)。

我发现使 Glue 作业显示为“失败”的唯一方法是从主脚本引发异常(not 在映射器或过滤器函数中,因为那些似乎被分拆到数据处理单元)。

幸运的是,一种检测映射或过滤器函数内部是否发生异常的方法:使用DynamicFrame.stageErrorsCount()方法。它将 return 一个数字,指示在 运行 最近一次转换时引发了多少异常。

那么解决所有问题的正确方法是:

  • 确保您的映射或转换函数显式记录其中发生的任何异常。这最好通过使用装饰器函数或其他一些可重用机制来完成,而不是依赖于在您编写的每个函数中放置 try/except 语句。
  • 在您想要捕获错误的每个转换之后,调用 stageErrorsCount() 方法并检查它是否大于 0。如果您想中止作业,只需引发异常。

例如:

import logging

def log_errors(inner):
    def wrapper(*args, **kwargs):
        try:
            return inner(*args, **kwargs)
        except Exception as e:
            logging.exception('Error in function: {}'.format(inner))
            raise
    return wrapper

@log_errors
def foo(record):
    1 / 0

然后,在你的工作中,你会做这样的事情:

df = df.map(foo, "foo")
if df.stageErrorsCount() > 0:
    raise Exception("Error in job! See the log!")

请注意,出于某种原因,即使从映射器函数内部调用 logging.exception 仍然不会将日志写入 AWS CloudWatch Logs 中的 error 日志。它被写入常规成功日志。但是,使用这种技术,您至少会看到作业失败并能够在日志中找到信息。另一个警告:Dev Endpoints 似乎没有显示来自映射器或过滤器函数的任何日志。