在自定义操作员气流中拉取 xcom 值
pull xcom value inside custom operator airflow
我在 Airflow 中写了一个名为 HadoopPutHdfs
的自定义运算符,
所以我需要将 xxx
参数传递给 HadoopPutHdfs
并且我需要用 generate_file_path
任务
中的 return 值填充 xxx
with DAG(dag_id='my_custom_operator_dag', schedule_interval='1 * * * *', default_args=default_args, catchup=False) as dag:
generate_file_path = PythonOperator(
task_id='generate_file_path',
python_callable=generate_file_path_func,
dag=dag,
)
put_to_hdfs = HadoopPutHdfs(
task_id='put_to_hdfs',
headers={'Content-Type': 'text/plain'},
hdfs_path='webhdfs/v1/user/hive/13.zip',
hadoop_host='10.10.10.146',
hadoop_port=9870,
source_path='/opt/airflow/dags/1.zip',
dag=dag,
xxx= "{{ ti.xcom_pull(task_ids=['generate_file_path']) }}",
)
此行无效,
xxx= "{{ ti.xcom_pull(task_ids=['generate_file_path']) }}"
如何将 generate_file_path
函数的数量传递给 xxx
参数?
听起来您在自定义运算符中缺少 xxx
作为 template_field
的定义。例如:
class CustomDummyOperator(BaseOperator):
template_fields = ('msg_from_previous_task',)
def __init__(self,
msg_from_previous_task,
*args, **kwargs) -> None:
super(CustomDummyOperator, self).__init__(*args, **kwargs)
self.msg_from_previous_task = msg_from_previous_task
def execute(self, context):
print(f"Message: {self.msg_from_previous_task}")
DAG:
def return_a_str():
return "string_value_from_op1"
task_1 = PythonOperator(
task_id='task_1',
dag=dag,
python_callable=return_a_str,
)
task_2 = CustomDummyOperator(
task_id='task_2',
dag=dag,
msg_from_previous_task="{{ ti.xcom_pull(task_ids='task_1') }}"
)
task_2
的输出是:Message: string_value_from_op1
您可以使用 XcomArg 以获得更简洁的语法:
task_2 = CustomDummyOperator(
task_id='task_2',
dag=dag,
msg_from_previous_task=task_1.output
# msg_from_previous_task="{{ ti.xcom_pull(task_ids='task_1') }}"
)
我在 Airflow 中写了一个名为 HadoopPutHdfs
的自定义运算符,
所以我需要将 xxx
参数传递给 HadoopPutHdfs
并且我需要用 generate_file_path
任务
xxx
with DAG(dag_id='my_custom_operator_dag', schedule_interval='1 * * * *', default_args=default_args, catchup=False) as dag:
generate_file_path = PythonOperator(
task_id='generate_file_path',
python_callable=generate_file_path_func,
dag=dag,
)
put_to_hdfs = HadoopPutHdfs(
task_id='put_to_hdfs',
headers={'Content-Type': 'text/plain'},
hdfs_path='webhdfs/v1/user/hive/13.zip',
hadoop_host='10.10.10.146',
hadoop_port=9870,
source_path='/opt/airflow/dags/1.zip',
dag=dag,
xxx= "{{ ti.xcom_pull(task_ids=['generate_file_path']) }}",
)
此行无效,
xxx= "{{ ti.xcom_pull(task_ids=['generate_file_path']) }}"
如何将 generate_file_path
函数的数量传递给 xxx
参数?
听起来您在自定义运算符中缺少 xxx
作为 template_field
的定义。例如:
class CustomDummyOperator(BaseOperator):
template_fields = ('msg_from_previous_task',)
def __init__(self,
msg_from_previous_task,
*args, **kwargs) -> None:
super(CustomDummyOperator, self).__init__(*args, **kwargs)
self.msg_from_previous_task = msg_from_previous_task
def execute(self, context):
print(f"Message: {self.msg_from_previous_task}")
DAG:
def return_a_str():
return "string_value_from_op1"
task_1 = PythonOperator(
task_id='task_1',
dag=dag,
python_callable=return_a_str,
)
task_2 = CustomDummyOperator(
task_id='task_2',
dag=dag,
msg_from_previous_task="{{ ti.xcom_pull(task_ids='task_1') }}"
)
task_2
的输出是:Message: string_value_from_op1
您可以使用 XcomArg 以获得更简洁的语法:
task_2 = CustomDummyOperator(
task_id='task_2',
dag=dag,
msg_from_previous_task=task_1.output
# msg_from_previous_task="{{ ti.xcom_pull(task_ids='task_1') }}"
)