气流 PythonBranchOperator returns 无效值
Airflow PythonBranchOperator returns invalid value
我在使用 Airflow 1.10 Python 分支操作员时遇到问题。我有一个扫描云存储桶的 dag,如果找到则处理文件。如果文件丢失,它会命中 no_file_found 虚拟运算符并完成,否则它会前进到一些解析步骤。
对于单个文件,此工作流程效果很好。当我为第二个文件添加相同的逻辑时,我的问题就出现了。目前 check_for_Post_Performance returns cleans_headers_for_gcm 任务,我完全不知道这是怎么发生的。从下面的大纲来看,它应该只有两条前进路径,clean_headers_Post_Perfromance 或 no_file_found.
我根据文件名列表动态创建这些任务。我遍历每个文件名并构建以下运算符:
def build_check(filename):
return BranchPythonOperator(
task_id=f'check_for_{file_name}'.replace(' ', '_'),
python_callable=check_file_exists,
op_kwargs={'filename': filename},
provide_context=True,
dag=dag
)
def check_file_exists(filename, **context):
xcom_value = context['ti'].xcom_pull(task_ids=f'list_files')
if any(filename in s for s in xcom_value):
return f'clean_headers_for_{file_name}'.replace(' ', '_')
else:
return 'no_file_found'
我检查了呈现的任务模板以确认 'Post Performance' 已为文件名变量传递
但是在查看日志时,我看到以下内容:
[2021-12-02 20:15:56,742] {logging_mixin.py:120} INFO - Running <TaskInstance: example_dag.check_for_Post_Performance 2021-12-02T20:14:50.724084+00:00 [running]> on host 21d0393eb686
[2021-12-02 20:15:56,766] {python_operator.py:114} INFO - Done. Returned value was: clean_headers_for_GCM
[2021-12-02 20:15:56,767] {skipmixin.py:122} INFO - Following branch clean_headers_for_GCM
[2021-12-02 20:15:56,773] {skipmixin.py:158} INFO - Skipping tasks ['no_file_found', 'clean_headers_for_Post_Performance']
我最好的猜测是函数并没有像我认为的那样在每个循环中创建,或者某些触发规则使我感到困惑。我怎样才能让源列表中的每个文件彼此独立地到达 no_file_found 或 clean_headers 任务?
编辑
这是我用来从静态列表构建任务的代码:
for file_name, table_name in FILES().items():
import_to_bq = import_file(file_name, table_name)
clean_headers_task = clean_headers(file_name)
start_import >> list_files >> build_check(file_name) >> [clean_headers_task, no_file]
clean_headers_task >> import_to_bq >> archive_file(file_name)
也许是file_name
和filename
的区别?看起来任务 ID 使用 file_name
而 arg 是 filename
。这些函数都应该使用 filename
吗?
def build_check(filename):
return BranchPythonOperator(
task_id=f'check_for_{filename}'.replace(' ', '_'),
python_callable=check_file_exists,
op_kwargs={'filename': filename},
provide_context=True,
dag=dag
)
def check_file_exists(filename, **context):
xcom_value = context['ti'].xcom_pull(task_ids=f'list_files')
if any(filename in s for s in xcom_value):
return f'clean_headers_for_{filename}'.replace(' ', '_')
else:
return 'no_file_found'
我在使用 Airflow 1.10 Python 分支操作员时遇到问题。我有一个扫描云存储桶的 dag,如果找到则处理文件。如果文件丢失,它会命中 no_file_found 虚拟运算符并完成,否则它会前进到一些解析步骤。
对于单个文件,此工作流程效果很好。当我为第二个文件添加相同的逻辑时,我的问题就出现了。目前 check_for_Post_Performance returns cleans_headers_for_gcm 任务,我完全不知道这是怎么发生的。从下面的大纲来看,它应该只有两条前进路径,clean_headers_Post_Perfromance 或 no_file_found.
我根据文件名列表动态创建这些任务。我遍历每个文件名并构建以下运算符:
def build_check(filename):
return BranchPythonOperator(
task_id=f'check_for_{file_name}'.replace(' ', '_'),
python_callable=check_file_exists,
op_kwargs={'filename': filename},
provide_context=True,
dag=dag
)
def check_file_exists(filename, **context):
xcom_value = context['ti'].xcom_pull(task_ids=f'list_files')
if any(filename in s for s in xcom_value):
return f'clean_headers_for_{file_name}'.replace(' ', '_')
else:
return 'no_file_found'
我检查了呈现的任务模板以确认 'Post Performance' 已为文件名变量传递
但是在查看日志时,我看到以下内容:
[2021-12-02 20:15:56,742] {logging_mixin.py:120} INFO - Running <TaskInstance: example_dag.check_for_Post_Performance 2021-12-02T20:14:50.724084+00:00 [running]> on host 21d0393eb686
[2021-12-02 20:15:56,766] {python_operator.py:114} INFO - Done. Returned value was: clean_headers_for_GCM
[2021-12-02 20:15:56,767] {skipmixin.py:122} INFO - Following branch clean_headers_for_GCM
[2021-12-02 20:15:56,773] {skipmixin.py:158} INFO - Skipping tasks ['no_file_found', 'clean_headers_for_Post_Performance']
我最好的猜测是函数并没有像我认为的那样在每个循环中创建,或者某些触发规则使我感到困惑。我怎样才能让源列表中的每个文件彼此独立地到达 no_file_found 或 clean_headers 任务?
编辑 这是我用来从静态列表构建任务的代码:
for file_name, table_name in FILES().items():
import_to_bq = import_file(file_name, table_name)
clean_headers_task = clean_headers(file_name)
start_import >> list_files >> build_check(file_name) >> [clean_headers_task, no_file]
clean_headers_task >> import_to_bq >> archive_file(file_name)
也许是file_name
和filename
的区别?看起来任务 ID 使用 file_name
而 arg 是 filename
。这些函数都应该使用 filename
吗?
def build_check(filename):
return BranchPythonOperator(
task_id=f'check_for_{filename}'.replace(' ', '_'),
python_callable=check_file_exists,
op_kwargs={'filename': filename},
provide_context=True,
dag=dag
)
def check_file_exists(filename, **context):
xcom_value = context['ti'].xcom_pull(task_ids=f'list_files')
if any(filename in s for s in xcom_value):
return f'clean_headers_for_{filename}'.replace(' ', '_')
else:
return 'no_file_found'