气流日志 BrokenPipeException

Airflow Logs BrokenPipeException

我正在使用集群 Airflow 环境,其中我有四个 AWS ec2 实例用于服务器。

ec2-实例

我的设置三个月来一直运行良好,但偶尔大约每周一次,当 Airflow 尝试记录某些内容时,我会收到一个破管异常。

*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-13T00:00:00/1.log

[2018-07-16 00:00:15,521] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:00:15,698] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1407} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2018-07-16 00:00:15,719] {models.py:1428} INFO - Executing <Task(OmegaFileSensor): task_1> on 2018-07-13 00:00:00
[2018-07-16 00:00:15,720] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'airflow run foobar task_1 2018-07-13T00:00:00 --job_id 1320 --raw -sd DAGS_FOLDER/datalake_digitalplatform_arl_workflow_schedule_test_2.py']
[2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,532] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config
[2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,532] {default_celery.py:41} WARNING - Celery Executor will run without SSL
[2018-07-16 00:00:16,534] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,533] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-07-16 00:00:16,597] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,597] {models.py:189} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/datalake_digitalplatform_arl_workflow_schedule_test_2.py
[2018-07-16 00:00:16,768] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - --- Logging error ---

[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - Traceback (most recent call last):

[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING -   File "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
    self.flush()

[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING -   File "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
    self.stream.flush()

[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - BrokenPipeError: [Errno 32] Broken pipe

[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - Call stack:

[2018-07-16 00:16:24,933] {logging_mixin.py:84} WARNING -   File "/usr/bin/airflow", line 27, in <module>
    args.func(args)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in _run_raw_task
    result = task_copy.execute(context=context)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 78, in execute
    while not self.poke(context):

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
    directory = os.listdir(full_path)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 36, in handle_timeout
    self.log.error("Process timed out")

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - Message: 'Process timed out'
Arguments: ()

[2018-07-16 00:16:24,942] {models.py:1595} ERROR - Timeout
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 78, in execute
    while not self.poke(context):
  File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
    directory = os.listdir(full_path)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout
[2018-07-16 00:16:24,942] {models.py:1624} INFO - Marking task as FAILED.
[2018-07-16 00:16:24,956] {models.py:1644} ERROR - Timeout

有时候报错也会说

*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
*** Failed to fetch log file from worker. 404 Client Error: NOT FOUND for url: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log

我不确定为什么日志在约 95% 的时间都在工作,但在其他时间却随机失败。这是我 Airflow.cfg 文件中的日志设置,

# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /home/ec2-user/airflow/logs

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id =
encrypt_s3_logs = False

# Logging level
logging_level = INFO

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =

# Log format
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Name of handler to read task instance logs.
# Default to use file task handler.
task_log_reader = file.task

# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -
error_logfile = 

# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5

# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# How often should stats be printed to the logs
print_stats_interval = 30

child_process_log_directory = /home/ec2-user/airflow/logs/scheduler

我想知道我是否应该尝试一种不同的日志记录技术,例如写入 S3 存储桶,或者我是否可以采取其他措施来解决此问题。

更新:

将日志写入 S3 并未解决此问题。此外,错误现在更加一致(仍然是零星的)。现在大约有 50% 的时间发生这种情况。我注意到的一件事是它正在执行的任务是我的 AWS EMR 创建任务。启动 AWS EMR 集群大约需要 20 分钟,然后任务必须等待 Spark 命令在 EMR 集群上 运行。所以单个任务是运行ning 30分钟左右。我想知道这对于 Airflow 任务来说是否太长 运行ning 以及是否这就是它开始无法写入日志的原因。如果是这种情况,那么我可以分解 EMR 任务,这样就有一个任务用于创建 EMR,然后另一个任务用于 EMR 集群上的 Spark 命令。

注:

我还在 Airflow 的 Jira 上创建了一个新的故障单 https://issues.apache.org/jira/browse/AIRFLOW-2844

这个问题是我刚刚在这里解决的另一个问题的征兆

我有一段时间没有看到 AirflowException: Celery 命令失败,因为它出现在 airflow worker 日志中。直到我实时查看airflow worker日志,我才看到抛出错误时我的任务中也得到了BrokenPipeException。

虽然有点奇怪。如果我 print("something to log") Worker 节点上发生 AirflowException: Celery command failed... 错误,我只会看到 BrokenPipeException 抛出。当我将所有打印语句更改为使用 import logging ... logging.info("something to log") 时,我不会看到 BrokenPipeException 但是 由于 AirflowException: Celery command failed... 错误,任务仍然会失败。但是如果我没有在我的 Airflow 任务日志中看到 BrokenPipeException 被抛出,我将不知道任务失败的原因,因为一旦我删除了打印语句,我就再也没有在 Airflow 任务日志中看到任何错误(仅在 $气流工作者 日志)

长话短说,有一些收获。

  1. 不要 print("something to log") 通过导入日志记录然后使用日志记录 class 使用 Airflow 的内置日志记录 import logging 然后 logging.info("something to log")

  2. 如果您使用 AWS EC2 实例作为 Airflow 的服务器,那么您可能会遇到这个问题:https://github.com/apache/incubator-airflow/pull/2484 a fix to this issue has already been integrated into Airflow Version 1.10 (I'm currently using Airflow Version 1.9). So upgrade your Airflow version to 1.10. You can also use pip install git+git://github.com/apache/incubator-airflow.git@v1-10-stable. Also, if you don't want to upgrade your Airflow version then you could follow the steps on the github issue 要么使用修复程序手动更新文件,要么 fork Airflow然后 cherry 选择修复它的提交。