运行 在 Google Cloud Composer 上使用 Airflow 的 shell 脚本文件

Run a shell script file with Airflow on Google Cloud Composer

我有几个多用途 shell 脚本存储在 .sh 文件中。我的意图是在 Cloud Composer 上构建一些将利用这些脚本的 Airflow DAG。 DAG 将主要由使用特定参数调用脚本的 BashOperators 组成。

这是一个简单的例子,greeter.sh:

#!/bin/bash
echo "Hello, !"

我可以像这样在本地运行它:

bash greeter.sh world
> Hello, world!

让我们写一个简单的DAG:

# import and define default_args

dag = DAG('bash_test',
          description='Running a local bash script',
          default_args=default_args,
          schedule_interval='0,30 5-23 * * *',
          catchup=False,
          max_active_runs=1)

bash_task = BashOperator(
    task_id='run_command',
    bash_command=f"bash greeter.sh world",
    dag=dag
)

但是脚本放在哪里greeter.sh?我尝试将它同时放在 dags/ 文件夹和 data/ 文件夹中,在第一层或嵌套在 dependencies/ 目录中。我还尝试将地址写为 ./greeter.sh。毫无意义:我永远找不到文件。

我也尝试使用 sh 代替 bash,但我得到了一个不同的错误:sh: 0: Can't open greeter.sh。但是当文件不存在时也会出现此错误,所以这是同样的问题。与 运行 chmod +rx.

的任何尝试相同

如何让 Airflow 可以使用我的文件?

的评论揭晓了答案。

dags_folder 的地址存储在 DAGS_FOLDER 环境变量中。

获取存储在 dags_folder/ 中的脚本的正确地址:

import os

DAGS_FOLDER = os.environ["DAGS_FOLDER"]
file = f"{DAGS_FOLDER}/greeter.sh"