气流参数传递
Airflow parameter passing
我有一个简单的工作,如果可能的话,我想在 Airflow 流程下移动。就目前而言,我有一串 bash 脚本,它们访问服务器并下载文件的最新版本,然后对该文件执行各种下游操作。
exec ./somescript.sh somefileurl
我想知道的是:如何在每次需要运行这个过程时将URL传递给这个文件?
似乎如果我尝试 运行 bash 脚本作为 bash 命令,就像这样:
download = BashOperator(
task_id='download_release',
bash_command='somescript.sh',
# params={'URL': 'somefileurl'},
dag=dag)
我无法传入 bash 脚本需要的一个参数。否则,如果我尝试将 bash 脚本作为 bash 命令发送,如下所示:
download = BashOperator(
task_id='download_release',
bash_command='./somescript.sh {{ URL }}',
params={'URL': 'somefileurl'},
dag=dag)
当程序试图在临时目录的上下文中执行脚本时,我收到一个执行错误。这会破坏脚本,因为它需要访问位于同一目录中的一些凭据文件,我想保持相关文件位置不变...
想法?
更新:对我有用的方法
download = BashOperator(
task_id='download_release',
bash_command='cd {{ params.dir }} && ./somescript.sh {{ params.url }}',
params={'url': 'somefileurl',
'dir': 'somedir'},
dag=dag)
不过我还没有实现任何参数传递。
这是将参数传递给 BashOperator 的示例:
templated_command = """
cd /working_directory
somescript.sh {{ dag_run.conf['URL'] }}
"""
download = BashOperator(
task_id='download_release',
bash_command=templated_command,
dag=dag)
有关此的讨论,请参阅 passing parameters to externally trigged dag. Airflow has two example DAG's that demonstrate this: example_trigger_controller_dag and example_trigger_target_dag. Also, see the Airflow api reference on macros。
我有一个简单的工作,如果可能的话,我想在 Airflow 流程下移动。就目前而言,我有一串 bash 脚本,它们访问服务器并下载文件的最新版本,然后对该文件执行各种下游操作。
exec ./somescript.sh somefileurl
我想知道的是:如何在每次需要运行这个过程时将URL传递给这个文件?
似乎如果我尝试 运行 bash 脚本作为 bash 命令,就像这样:
download = BashOperator(
task_id='download_release',
bash_command='somescript.sh',
# params={'URL': 'somefileurl'},
dag=dag)
我无法传入 bash 脚本需要的一个参数。否则,如果我尝试将 bash 脚本作为 bash 命令发送,如下所示:
download = BashOperator(
task_id='download_release',
bash_command='./somescript.sh {{ URL }}',
params={'URL': 'somefileurl'},
dag=dag)
当程序试图在临时目录的上下文中执行脚本时,我收到一个执行错误。这会破坏脚本,因为它需要访问位于同一目录中的一些凭据文件,我想保持相关文件位置不变...
想法?
更新:对我有用的方法
download = BashOperator(
task_id='download_release',
bash_command='cd {{ params.dir }} && ./somescript.sh {{ params.url }}',
params={'url': 'somefileurl',
'dir': 'somedir'},
dag=dag)
不过我还没有实现任何参数传递。
这是将参数传递给 BashOperator 的示例:
templated_command = """
cd /working_directory
somescript.sh {{ dag_run.conf['URL'] }}
"""
download = BashOperator(
task_id='download_release',
bash_command=templated_command,
dag=dag)
有关此的讨论,请参阅 passing parameters to externally trigged dag. Airflow has two example DAG's that demonstrate this: example_trigger_controller_dag and example_trigger_target_dag. Also, see the Airflow api reference on macros。