Airflow - 如何增加存储在 XCom 中的值
Airflow - How to incerase a value stored in XCom
我已经从数据库中获取了一个值并将其存储在 XCom 中,我想将其增加 1。我尝试通过以下方法增加它但没有任何运气。是否可以增加存储在 XCom 中的值?
'{{ ti.xcom_pull("task_id") + 1}}'
'{{ int(ti.xcom_pull("task_id")) + 1}}'
编辑
这是我的气流 DAG 的一部分。我有一项任务是从 Hbase 中提取数据:
pull_data_hbase = BashOperator(
task_id='pull_data_hbase',
dag=dag,
bash_command=<My_command_for_exract_data_from_hbase>,
xcom_push=True)
另一个更新 table 增量 1 的任务:
data_to_hbase = BashOperator(
task_id='data_to_hbase',
dag=dag,
bash_command=<Command_for_update_table_with_XCom_value>
% ('{{ ti.xcom_pull("pull_data_hbase") +1 }}')
)
当我使用 '{{ int(ti.xcom_pull("task_id")) + 1}}'
时,我收到以下消息:
[2022-01-13 20:39:47,104] {base_task_runner.py:101} INFO - Job
3868282: Subtask print_prev_task ('type:', "{{
ti.xcom_pull('pull_data_hbase') }}") [2022-01-13 20:39:47,105]
{base_task_runner.py:101} INFO - Job 3868282: Subtask print_prev_task
[2022-01-13 20:39:47,103] {cli.py:520} INFO - Running <TaskInstance:
tv_paramount_monthly_report2.0.7-SNAPSHOT.print_prev_task
2021-11-15T00:00:00+00:00 [running]> on host
dl100ven01.ddc.teliasonera.net
[2022-01-13 20:39:47,159]
{models.py:1788} ERROR - 'int' is undefined
您可以编写一个实际的 Python 函数,并将其作为宏传递到您的 DAG 中。
然后可以从气流模板值调用该函数。
用户宏字典中的键名是模板中使用的名称。
例如
def increment(task_instance, task_id):
return int(task_instance.xcom_pull(task_id)) + 1
with DAG(
dag_id='dag_id',
user_defined_macros={'increment': increment},
) as dag:
pull_data_hbase = BashOperator(
task_id='pull_data_hbase',
dag=dag,
bash_command='echo x+1={{ increment(ti, "task_id") }}',
xcom_push=True,
)
您无权访问 Jinja 模板中的 Python libraries/functions。 TLDR 答案是:
"{{ ti.xcom_pull('pull_data_hbase') | int + 1 }}"
您可以在 Jinja 模板中使用某些功能,这些在 Jinja 中称为“宏”。 Airflow 提供了几个开箱即用的宏:https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#macros。您还可以提供自己的宏,如@Hitobat 所示。
您可以在 Jinja 模板中使用的另一件事是“过滤器”(参见 built-in filters)。这些可以用管道 (|
) 应用,如上所示使用 int
过滤器。
我已经从数据库中获取了一个值并将其存储在 XCom 中,我想将其增加 1。我尝试通过以下方法增加它但没有任何运气。是否可以增加存储在 XCom 中的值?
'{{ ti.xcom_pull("task_id") + 1}}'
'{{ int(ti.xcom_pull("task_id")) + 1}}'
编辑
这是我的气流 DAG 的一部分。我有一项任务是从 Hbase 中提取数据:
pull_data_hbase = BashOperator(
task_id='pull_data_hbase',
dag=dag,
bash_command=<My_command_for_exract_data_from_hbase>,
xcom_push=True)
另一个更新 table 增量 1 的任务:
data_to_hbase = BashOperator(
task_id='data_to_hbase',
dag=dag,
bash_command=<Command_for_update_table_with_XCom_value>
% ('{{ ti.xcom_pull("pull_data_hbase") +1 }}')
)
当我使用 '{{ int(ti.xcom_pull("task_id")) + 1}}'
时,我收到以下消息:
[2022-01-13 20:39:47,104] {base_task_runner.py:101} INFO - Job 3868282: Subtask print_prev_task ('type:', "{{ ti.xcom_pull('pull_data_hbase') }}") [2022-01-13 20:39:47,105] {base_task_runner.py:101} INFO - Job 3868282: Subtask print_prev_task [2022-01-13 20:39:47,103] {cli.py:520} INFO - Running <TaskInstance: tv_paramount_monthly_report2.0.7-SNAPSHOT.print_prev_task 2021-11-15T00:00:00+00:00 [running]> on host dl100ven01.ddc.teliasonera.net
[2022-01-13 20:39:47,159] {models.py:1788} ERROR - 'int' is undefined
您可以编写一个实际的 Python 函数,并将其作为宏传递到您的 DAG 中。
然后可以从气流模板值调用该函数。 用户宏字典中的键名是模板中使用的名称。
例如
def increment(task_instance, task_id):
return int(task_instance.xcom_pull(task_id)) + 1
with DAG(
dag_id='dag_id',
user_defined_macros={'increment': increment},
) as dag:
pull_data_hbase = BashOperator(
task_id='pull_data_hbase',
dag=dag,
bash_command='echo x+1={{ increment(ti, "task_id") }}',
xcom_push=True,
)
您无权访问 Jinja 模板中的 Python libraries/functions。 TLDR 答案是:
"{{ ti.xcom_pull('pull_data_hbase') | int + 1 }}"
您可以在 Jinja 模板中使用某些功能,这些在 Jinja 中称为“宏”。 Airflow 提供了几个开箱即用的宏:https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#macros。您还可以提供自己的宏,如@Hitobat 所示。
您可以在 Jinja 模板中使用的另一件事是“过滤器”(参见 built-in filters)。这些可以用管道 (|
) 应用,如上所示使用 int
过滤器。