为气流中的日志设置 s3

setting up s3 for logs in airflow

我正在使用 docker-compose 来设置可扩展的气流集群。我的方法基于这个 Dockerfile https://hub.docker.com/r/puckel/docker-airflow/

我的问题是从 s3 将日志设置为 write/read。当一个 dag 完成时,我得到这样的错误

*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.

*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00

我像这样在 airflow.cfg 文件中设置了一个新部分

[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx

然后在airflow.cfg

的远程日志部分指定s3路径
remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn

我是否设置正确并且存在错误?这里有我缺少的成功秘诀吗?

-- 更新

我尝试以 URI 和 JSON 格式导出,但似乎都不起作用。然后我导出 aws_access_key_id 和 aws_secret_access_key 然后气流开始拾取它。现在我在工作日志中得到了他的错误

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00

-- 更新

我也找到了这个 link https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html

然后我将我的一台工作机器(与网络服务器和调度程序分开)和 运行 python

中的这段代码进行了炮击
import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))

我收到这个错误。

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

我尝试导出几种不同类型的 AIRFLOW_CONN_ envs,如连接部分 https://airflow.incubator.apache.org/concepts.html 和该问题的其他答案所述。

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

我也导出了 AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY 但没有成功。

这些凭据存储在数据库中,所以一旦我将它们添加到 UI 中,工作人员应该可以获取它们,但由于某种原因它们无法 write/read 记录。

您需要通过 Airflow UI 设置 S3 连接。为此,您需要转到 airflow 管理 -> 连接 选项卡 UI 并为您的 S3 连接创建一个新行。

示例配置为:

Conn Id: my_conn_S3

Conn Type: S3

Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

(更新自 Airflow 1.10.2)

如果您不使用管理员UI,这里有一个解决方案。

我的 Airflow 没有在持久服务器上 运行 ...(它每天在 Heroku 上的 Docker 容器中重新启动。)我知道我错过了很多很棒的功能,但是 在我的最小设置中,我从未接触过管理 UI 或 cfg 文件。 相反,我必须设置 Airflow-specific 环境变量在覆盖 .cfg 文件的 bash 脚本中。

apache-airflow[s3]

首先,您需要安装 s3 子包以将您的 Airflow 日志写入 S3。 (boto3 适用于 DAG 中的 Python 个作业,但 S3Hook 取决于 s3 子包。)

再补充一点:conda install doesn't handle this yet,所以我必须做 pip install apache-airflow[s3]

环境变量

在 bash 脚本中,我设置了这些 core 变量。从 these instructions 开始,但对环境变量使用命名约定 AIRFLOW__{SECTION}__{KEY},我这样做:

export AIRFLOW__CORE__REMOTE_LOGGING=True
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

S3 连接 ID

上面的s3_uri是我编的连接ID。在Airflow中,它对应另一个环境变量,AIRFLOW_CONN_S3_URI。该值是您的 S3 路径,必须采用 URI 形式。那是

s3://access_key:secret_key@bucket/key

存储这个但是你处理其他敏感的环境变量。

通过此配置,Airflow 将能够将您的日志写入 S3。他们将遵循 s3://bucket/key/dag/task_id/timestamp/1.log.

的路径

关于从 Airflow 1.8 升级到 Airflow 1.10 的附录

我最近将我的生产管道从 Airflow 1.8 升级到 1.9,然后是 1.10。好消息是变化很小;剩下的工作只是弄清楚软件包安装的细微差别(与关于 S3 日志的原始问题无关)。

(1) 首先,我需要使用 Airflow 1.9 升级到 Python 3.6。

(2) 包名称从 airflow 更改为 apache-airflow 1.9。您也可以在 pip install.

中 运行 变成 this

(3) 包 psutil 必须在 Airflow 的特定版本范围内。您在执行 pip install apache-airflow.

时可能会遇到这种情况

(4) Airflow 1.9+ 需要 python3-dev headers。

(5) 以下是实质性更改:现在需要 export AIRFLOW__CORE__REMOTE_LOGGING=True。并且

(6) 日志在 S3 中的路径略有不同,我在答案中对其进行了更新:s3://bucket/key/dag/task_id/timestamp/1.log.

但仅此而已!日志在 1.9 中不起作用,所以我建议直接转到 1.10,现在它可用了。

更新 Airflow 1.10 使日志记录 a lot easier.

对于 s3 日志记录,根据

设置连接挂钩

然后只需将以下内容添加到 airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_base_log_folder = s3://my-bucket/path/to/logs
    remote_log_conn_id = MyS3Conn
    # Use server-side encryption for logs stored in S3
    encrypt_s3_logs = False

对于 gcs 日志记录,

  1. 首先安装gcp_api包,像这样:pip install apache-airflow[gcp_api].

  2. 按照

  3. 设置连接钩子
  4. 将以下内容添加到airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_logging = True
    remote_base_log_folder = gs://my-bucket/path/to/logs
    remote_log_conn_id = MyGCSConn
    

注意:从 Airflow 1.9 开始,远程日志记录已 significantly altered。如果您使用的是 1.9,请继续阅读。

引用here

完整说明:

  1. 创建一个目录来存储配置并将其放置在 PYTHONPATH 中。一个例子是 $AIRFLOW_HOME/config

  2. 创建名为 $AIRFLOW_HOME/config/log_config.py 的空文件并 $AIRFLOW_HOME/config/__init__.py

  3. airflow/config_templates/airflow_local_settings.py的内容复制到上面步骤刚创建的log_config.py文件中。

  4. 自定义模板的以下部分:

    #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
    
  5. 确保已根据 在 Airflow 中定义了 s3 连接挂钩。该挂钩应该具有对上面 S3_LOG_FOLDER 中定义的 s3 存储桶的读写访问权限。

  6. 更新 $AIRFLOW_HOME/airflow.cfg 以包含:

    task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>
    
  7. 重新启动 Airflow 网络服务器和调度程序,并触发(或等待)新任务执行。

  8. 验证日志是否显示您定义的存储桶中新执行的任务。

  9. 验证 s3 存储查看器是否在 UI 中工作。调出一个新执行的任务,并确认您看到类似以下内容:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
    

要使用最近的 Airflow 更新完成 Arne 的回答,您无需将 task_log_reader 设置为默认值以外的其他值:task

就好像您遵循默认的日志记录模板 airflow/config_templates/airflow_local_settings.py you can see since this commit(注意处理程序的名称已更改为 's3': {'task'... 而不是 s3.task),这就是远程文件夹中的值(REMOTE_BASE_LOG_FOLDER) 将用正确的处理程序替换处理程序:

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

有关如何从 S3 登录 to/read 的更多详细信息:https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3

只是给遵循 中非常有用的说明的任何人的旁注: 如果您偶然发现这个问题:“ModuleNotFoundError: No module named 'airflow.utils.log.logging_mixin.RedirectStdHandler'" (在使用气流 1.9 时发生),修复很简单 - 使用此基本模板:https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py(并遵循 中的所有其他说明)

master 分支中存在的当前模板 incubator-airflow/airflow/config_templates/airflow_local_settings.py 包含对 class "airflow.utils.log.s3_task_handler.S3TaskHandler" 的引用,它在 apache-airflow==1.9.0 [=24= 中不存在] 包裹。 希望这对您有所帮助!

让它在 kube 中使用 Airflow 1.10。 我有以下 env var 集:

AIRFLOW_CONN_LOGS_S3=s3://id:secret_uri_encoded@S3
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3

呸!将气流错误扼杀在萌芽状态的动机是将其视为一堆 python 文件 XD 这是我在 apache-airflow==1.9.0.

方面的经验

首先,根本不需要尝试 airflow connections .......... --conn_extra 等等等等

只需将您的 airflow.cfg 设置为:

remote_logging = True
remote_base_log_folder = s3://dev-s3-main-ew2-dmg-immutable-potns/logs/airflow-logs/
encrypt_s3_logs = False

# Logging level
logging_level = INFO
fab_logging_level = WARN

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG
remote_log_conn_id = s3://<ACCESS-KEY>:<SECRET-ID>@<MY-S3-BUCKET>/<MY>/<SUB>/<FOLDER>/

保持 $AIRFLOW_HOME/config/__ init __.py$AIRFLOW_HOME/config/log_config.py 文件如上。

我的问题是缺少“boto3”包,我可以通过以下方式解决:

vi /usr/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py
then >> import traceback

并在包含以下内容的行中:

无法创建连接 ID 为“%s”的 S3Hook。 ' 'Please make sure that airflow[s3] is installed and ' '存在 S3 连接。

做了一个 traceback.print_exc() 并且它开始抱怨缺少 boto3!

安装它,生活再次变得美好!