为 apache 气流任务获取 unique_id
Getting unique_id for apache airflow tasks
我是气流的新手。在我公司的 ETL 管道中,目前我们使用 Crontab 和自定义调度程序(内部开发)。现在我们计划为我们所有的数据管道场景实施 apache 气流。对于在探索无法为每个任务 Instances/Dag 找到 unique_id 的功能时。当我搜索大多数解决方案时,最终都出现在宏和模板中。但是 [=其中 40=] 没有为任务提供 uniqueID。但是我能够在每个任务的 UI 中看到增量 uniqueID。有什么方法可以轻松访问我的 python 方法中的那些变量.主要用例是我需要将这些 ID 作为参数传递给 Python/ruby/Pentaho 作业,称为 scripts/Methods .
例如
我的 shell 脚本 'test.sh ' 需要两个参数,一个是 run_id,另一个是 collection_id。目前我们正在从中央数据库生成这个唯一的 run_id 并将其传递给作业。如果它已经存在于气流上下文中,我们将使用它
from airflow.operators.bash_operator import BashOperator
from datetime import date, datetime, timedelta
from airflow import DAG
shell_command = "/data2/test.sh -r run_id -c collection_id"
putfiles_s3 = BashOperator(
task_id='putfiles_s3',
bash_command=shell_command,
dag=dag)
在执行此 Dag(scheduled/manual) 时为每个 运行 寻找唯一的 run_id(任一 Dag level/task 级别)
注意:这是一个示例任务。此 Dag 将有多个依赖任务。
附加 Job_Id 气流 UI 的屏幕截图
谢谢
阿诺普
{{ ti.job_id }}
就是你想要的:
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
dag = DAG(
"job_id",
start_date=datetime(2018, 1, 1),
)
with dag:
BashOperator(
task_id='unique_id',
bash_command="echo {{ ti.job_id }}",
)
这将在运行时有效。此执行的日志如下所示:
[2018-01-03 10:28:37,523] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpcj0omuts//tmp/airflowtmpcj0omuts/unique_iddq7kw0yj
[2018-01-03 10:28:37,524] {bash_operator.py:88} INFO - Running command: echo 4
[2018-01-03 10:28:37,621] {bash_operator.py:97} INFO - Output:
[2018-01-03 10:28:37,648] {bash_operator.py:101} INFO - 4
请注意,这仅在运行时有效,因此 webui 中的 "Rendered Template" 视图将显示 None 而不是数字。
我是气流的新手。在我公司的 ETL 管道中,目前我们使用 Crontab 和自定义调度程序(内部开发)。现在我们计划为我们所有的数据管道场景实施 apache 气流。对于在探索无法为每个任务 Instances/Dag 找到 unique_id 的功能时。当我搜索大多数解决方案时,最终都出现在宏和模板中。但是 [=其中 40=] 没有为任务提供 uniqueID。但是我能够在每个任务的 UI 中看到增量 uniqueID。有什么方法可以轻松访问我的 python 方法中的那些变量.主要用例是我需要将这些 ID 作为参数传递给 Python/ruby/Pentaho 作业,称为 scripts/Methods .
例如
我的 shell 脚本 'test.sh ' 需要两个参数,一个是 run_id,另一个是 collection_id。目前我们正在从中央数据库生成这个唯一的 run_id 并将其传递给作业。如果它已经存在于气流上下文中,我们将使用它
from airflow.operators.bash_operator import BashOperator
from datetime import date, datetime, timedelta
from airflow import DAG
shell_command = "/data2/test.sh -r run_id -c collection_id"
putfiles_s3 = BashOperator(
task_id='putfiles_s3',
bash_command=shell_command,
dag=dag)
在执行此 Dag(scheduled/manual) 时为每个 运行 寻找唯一的 run_id(任一 Dag level/task 级别)
注意:这是一个示例任务。此 Dag 将有多个依赖任务。
附加 Job_Id 气流 UI 的屏幕截图
谢谢 阿诺普
{{ ti.job_id }}
就是你想要的:
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
dag = DAG(
"job_id",
start_date=datetime(2018, 1, 1),
)
with dag:
BashOperator(
task_id='unique_id',
bash_command="echo {{ ti.job_id }}",
)
这将在运行时有效。此执行的日志如下所示:
[2018-01-03 10:28:37,523] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpcj0omuts//tmp/airflowtmpcj0omuts/unique_iddq7kw0yj [2018-01-03 10:28:37,524] {bash_operator.py:88} INFO - Running command: echo 4 [2018-01-03 10:28:37,621] {bash_operator.py:97} INFO - Output: [2018-01-03 10:28:37,648] {bash_operator.py:101} INFO - 4
请注意,这仅在运行时有效,因此 webui 中的 "Rendered Template" 视图将显示 None 而不是数字。