将参数从 BranchPythonOperator 传递给 PythonOperator
Pass arguments from BranchPythonOperator to PythonOperator
我对 Airflow 还是很陌生,正在尝试弄清楚 tasks/dags 之间传递参数的逻辑。我的问题是 - 是否可以将参数从 BranchPythonOperator 任务传递到它调用的 task_id 中。
即:
@task
def task_a():
***print(a)***
return {}
def get_task_run(**kwargs):
a = 'Pass-Argument'
return 'task_a'
tasks = BranchPythonOperator(
task_id='get_task_run',
python_callable=get_task_run,
)
例如,在上面的代码中,是否有可能以某种方式获取从 BranchPythonOperator 调用的 'task_a' 中的变量 'a'?
一种方法是使用 get_task_run
函数执行 xcom_push
,然后使用 get_current_context
.[=20= 从 task_a
中提取它]
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context, BranchPythonOperator
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1),
catchup=False, tags=['example'])
def decorated_dag():
@task
def task_a():
context = get_current_context()
var_from_branch_task = context['ti'].xcom_pull(
task_ids='branch_task', key='a')
print(f"Result: {var_from_branch_task}")
@task
def task_b():
print('task_b')
def _get_task_run(ti):
if 'something':
ti.xcom_push(key='a', value='var_pushed_from_branch task')
return 'task_a'
else:
return 'task_b'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=_get_task_run,
)
task_a_exec = task_a()
task_b_exec = task_b()
branch_task >> [task_a_exec, task_b_exec]
example_decorated_dag = decorated_dag()
请记住,BranchPythonOperator
应该 return 单个 task_id 或 [=28 的列表=]s 跟随。这就是为什么你不能 return a dict
或 list
或 tuple
将它用作 XcomArg
与其他装饰任务。如果这对您有用,请告诉我!
我对 Airflow 还是很陌生,正在尝试弄清楚 tasks/dags 之间传递参数的逻辑。我的问题是 - 是否可以将参数从 BranchPythonOperator 任务传递到它调用的 task_id 中。
即:
@task
def task_a():
***print(a)***
return {}
def get_task_run(**kwargs):
a = 'Pass-Argument'
return 'task_a'
tasks = BranchPythonOperator(
task_id='get_task_run',
python_callable=get_task_run,
)
例如,在上面的代码中,是否有可能以某种方式获取从 BranchPythonOperator 调用的 'task_a' 中的变量 'a'?
一种方法是使用 get_task_run
函数执行 xcom_push
,然后使用 get_current_context
.[=20= 从 task_a
中提取它]
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context, BranchPythonOperator
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1),
catchup=False, tags=['example'])
def decorated_dag():
@task
def task_a():
context = get_current_context()
var_from_branch_task = context['ti'].xcom_pull(
task_ids='branch_task', key='a')
print(f"Result: {var_from_branch_task}")
@task
def task_b():
print('task_b')
def _get_task_run(ti):
if 'something':
ti.xcom_push(key='a', value='var_pushed_from_branch task')
return 'task_a'
else:
return 'task_b'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=_get_task_run,
)
task_a_exec = task_a()
task_b_exec = task_b()
branch_task >> [task_a_exec, task_b_exec]
example_decorated_dag = decorated_dag()
请记住,BranchPythonOperator
应该 return 单个 task_id 或 [=28 的列表=]s 跟随。这就是为什么你不能 return a dict
或 list
或 tuple
将它用作 XcomArg
与其他装饰任务。如果这对您有用,请告诉我!