Airflow - 在循环中处理 XCOM

Airflow - Handle XCOM inside a loop

我有以下需求。 一个 table 包含 10000 条记录。我需要从中获取 1000,进行一些处理并发送到 HTTP 端点。然后获取下一个 1000 并执行相同的操作。这 1000 个块可以独立处理。 所以我想出了下面的DAG。

default_args = {
    "owner": "airflow",
    "depends_on_past": True,
    "wait_for_downstream":True,
    "max_active_runs":1,
    "start_date": datetime(2020, 10, 8),
    "catchup": False
}

dag = DAG("dag_id", default_args=default_args, schedule_interval=timedelta(1))

limit = 10000
cur_size = 0

while limit>cur_size:
    def get_records(**kwargs):
        ti = kwargs['ti']
        xcom = ti.xcom_pull(task_ids='mysql_read_data_'+str(cur_size))
        data = [
            {
                "attributes": {
                    "att1": x,
                    "att2": y
                }
            }
            for x, y in xcom
        ]

        data = json.dumps(data)
    
        ti.xcom_push(key='data_to_cus_'+str(cur_size), value=data)

        return data



    mysql_read_data = MySQLReadOperator(dag=dag,
                                        mysql_conn_id='temper_flat_old',
                                        task_id='mysql_read_data_'+str(cur_size),
                                        params={'limit': 100, 'offset': cur_size},
                                        sql="sql/client_data.sql",
                                         trigger_rule="all_done",
                                         )


    python_task = PythonOperator(
        task_id='python_func_task_'+str(cur_size),
        dag=dag,
        python_callable=get_records,
        provide_context=True,
        trigger_rule="all_done",
    )

    send_to_endpoint = CustomOperator(
        dag=dag,
        task_id='custom_op_'+str(cur_size),
        data_xcom_task_id=python_task.task_id,
        data_xcom_key='data_to_cus_'+str(cur_size),
        trigger_rule="all_done",

    )


    mysql_read_data >> python_task >> send_to_endpoint

    cur_size += 1000

除一件事外,这工作正常。我在这一行 params={'limit': 100, 'offset': cur_size}, 中进行分页,其中 cur_size 由迭代设置。但不幸的是,第二次迭代在完成第一次迭代的任务之前就已经开始了。然后,当我在 get_records() 中执行 xcom_pull 时, cur_size 是错误的。还有 data_xcom_key='data_to_kva_'+str(cur_size),.

要么我需要阻止它执行下一次迭代,直到它完全完成当前迭代,要么必须有办法正确地让 task_id 正确地从 xcom 推或拉 因为现在我得到了错误

ERROR - 'NoneType' object is not iterable

原因是我用来引用 pull form xcom 的任务 id 不同

我该如何解决这个问题?谢谢

我认为主要问题是在 while 循环中定义函数。发生的事情是 Airflow 每次解析文件时都会呈现该函数,这通常每隔几个 minutes/seconds 发生一次。 PyhtonOperator 然后调用该函数,而不管 cur_size.

你真正想要的是将参数从 PythonOperator 传递给函数,你可以通过 op_kwargs:

def get_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='mysql_read_data_'+str(kwargs['custom_cur_size']))
    data = [
        {
            "attributes": {
                "att1": x,
                "att2": y
            }
        }
        for x, y in xcom
    ]

    data = json.dumps(data)

    ti.xcom_push(key='data_to_cus_'+str(kwargs['custom_cur_size']), value=data)

    return data

while limit>cur_size:

    mysql_read_data = MySQLReadOperator(dag=dag,
                                        mysql_conn_id='temper_flat_old',
                                        task_id='mysql_read_data_'+str(cur_size),
                                        params={'limit': 100, 'offset': cur_size},
                                        sql="sql/client_data.sql",
                                         trigger_rule="all_done",
                                         )


    python_task = PythonOperator(
        task_id='python_func_task_'+str(cur_size),
        dag=dag,
        python_callable=get_records,
        provide_context=True,
        op_kwargs={'custom_cur_size': str(cur_size)},
        trigger_rule="all_done",
    )

    send_to_endpoint = CustomOperator(
        dag=dag,
        task_id='custom_op_'+str(cur_size),
        data_xcom_task_id=python_task.task_id,
        data_xcom_key='data_to_cus_'+str(cur_size),
        trigger_rule="all_done",

    )


    mysql_read_data >> python_task >> send_to_endpoint

    cur_size += 1000