气流 PythonBranchOperator returns 无效值

Airflow PythonBranchOperator returns invalid value

我在使用 Airflow 1.10 Python 分支操作员时遇到问题。我有一个扫描云存储桶的 dag,如果找到则处理文件。如果文件丢失,它会命中 no_file_found 虚拟运算符并完成,否则它会前进到一些解析步骤。

对于单个文件,此工作流程效果很好。当我为第二个文件添加相同的逻辑时,我的问题就出现了。目前 check_for_Post_Performance returns cleans_headers_for_gcm 任务,我完全不知道这是怎么发生的。从下面的大纲来看,它应该只有两条前进路径,clean_headers_Post_Perfromance 或 no_file_found.

我根据文件名列表动态创建这些任务。我遍历每个文件名并构建以下运算符:

def build_check(filename):
    return BranchPythonOperator(
        task_id=f'check_for_{file_name}'.replace(' ', '_'),
        python_callable=check_file_exists,
        op_kwargs={'filename': filename},
        provide_context=True,
        dag=dag
   )

def check_file_exists(filename, **context):
    xcom_value = context['ti'].xcom_pull(task_ids=f'list_files')
    if any(filename in s for s in xcom_value):
        return f'clean_headers_for_{file_name}'.replace(' ', '_')   
    else:
        return 'no_file_found'

我检查了呈现的任务模板以确认 'Post Performance' 已为文件名变量传递

但是在查看日志时,我看到以下内容:

[2021-12-02 20:15:56,742] {logging_mixin.py:120} INFO - Running <TaskInstance: example_dag.check_for_Post_Performance 2021-12-02T20:14:50.724084+00:00 [running]> on host 21d0393eb686
[2021-12-02 20:15:56,766] {python_operator.py:114} INFO - Done. Returned value was: clean_headers_for_GCM
[2021-12-02 20:15:56,767] {skipmixin.py:122} INFO - Following branch clean_headers_for_GCM
[2021-12-02 20:15:56,773] {skipmixin.py:158} INFO - Skipping tasks ['no_file_found', 'clean_headers_for_Post_Performance']

我最好的猜测是函数并没有像我认为的那样在每个循环中创建,或者某些触发规则使我感到困惑。我怎样才能让源列表中的每个文件彼此独立地到达 no_file_found 或 clean_headers 任务?

编辑 这是我用来从静态列表构建任务的代码:

for file_name, table_name in FILES().items():
    import_to_bq = import_file(file_name, table_name)
    clean_headers_task =  clean_headers(file_name)

    start_import >> list_files >> build_check(file_name) >> [clean_headers_task, no_file] 
    clean_headers_task >> import_to_bq >> archive_file(file_name)

也许是file_namefilename的区别?看起来任务 ID 使用 file_name 而 arg 是 filename。这些函数都应该使用 filename 吗?

def build_check(filename):
    return BranchPythonOperator(
        task_id=f'check_for_{filename}'.replace(' ', '_'),
        python_callable=check_file_exists,
        op_kwargs={'filename': filename},
        provide_context=True,
        dag=dag
   )

def check_file_exists(filename, **context):
    xcom_value = context['ti'].xcom_pull(task_ids=f'list_files')
    if any(filename in s for s in xcom_value):
        return f'clean_headers_for_{filename}'.replace(' ', '_')   
    else:
        return 'no_file_found'