Airflow - 在循环中处理 XCOM
Airflow - Handle XCOM inside a loop
我有以下需求。
一个 table 包含 10000 条记录。我需要从中获取 1000,进行一些处理并发送到 HTTP 端点。然后获取下一个 1000 并执行相同的操作。这 1000 个块可以独立处理。
所以我想出了下面的DAG。
default_args = {
"owner": "airflow",
"depends_on_past": True,
"wait_for_downstream":True,
"max_active_runs":1,
"start_date": datetime(2020, 10, 8),
"catchup": False
}
dag = DAG("dag_id", default_args=default_args, schedule_interval=timedelta(1))
limit = 10000
cur_size = 0
while limit>cur_size:
def get_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='mysql_read_data_'+str(cur_size))
data = [
{
"attributes": {
"att1": x,
"att2": y
}
}
for x, y in xcom
]
data = json.dumps(data)
ti.xcom_push(key='data_to_cus_'+str(cur_size), value=data)
return data
mysql_read_data = MySQLReadOperator(dag=dag,
mysql_conn_id='temper_flat_old',
task_id='mysql_read_data_'+str(cur_size),
params={'limit': 100, 'offset': cur_size},
sql="sql/client_data.sql",
trigger_rule="all_done",
)
python_task = PythonOperator(
task_id='python_func_task_'+str(cur_size),
dag=dag,
python_callable=get_records,
provide_context=True,
trigger_rule="all_done",
)
send_to_endpoint = CustomOperator(
dag=dag,
task_id='custom_op_'+str(cur_size),
data_xcom_task_id=python_task.task_id,
data_xcom_key='data_to_cus_'+str(cur_size),
trigger_rule="all_done",
)
mysql_read_data >> python_task >> send_to_endpoint
cur_size += 1000
除一件事外,这工作正常。我在这一行 params={'limit': 100, 'offset': cur_size},
中进行分页,其中 cur_size
由迭代设置。但不幸的是,第二次迭代在完成第一次迭代的任务之前就已经开始了。然后,当我在 get_records()
中执行 xcom_pull 时, cur_size
是错误的。还有 data_xcom_key='data_to_kva_'+str(cur_size),
.
要么我需要阻止它执行下一次迭代,直到它完全完成当前迭代,要么必须有办法正确地让 task_id
正确地从 xcom
推或拉
因为现在我得到了错误
ERROR - 'NoneType' object is not iterable
原因是我用来引用 pull form xcom 的任务 id 不同
我该如何解决这个问题?谢谢
我认为主要问题是在 while 循环中定义函数。发生的事情是 Airflow 每次解析文件时都会呈现该函数,这通常每隔几个 minutes/seconds 发生一次。 PyhtonOperator 然后调用该函数,而不管 cur_size.
你真正想要的是将参数从 PythonOperator
传递给函数,你可以通过 op_kwargs
:
def get_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='mysql_read_data_'+str(kwargs['custom_cur_size']))
data = [
{
"attributes": {
"att1": x,
"att2": y
}
}
for x, y in xcom
]
data = json.dumps(data)
ti.xcom_push(key='data_to_cus_'+str(kwargs['custom_cur_size']), value=data)
return data
while limit>cur_size:
mysql_read_data = MySQLReadOperator(dag=dag,
mysql_conn_id='temper_flat_old',
task_id='mysql_read_data_'+str(cur_size),
params={'limit': 100, 'offset': cur_size},
sql="sql/client_data.sql",
trigger_rule="all_done",
)
python_task = PythonOperator(
task_id='python_func_task_'+str(cur_size),
dag=dag,
python_callable=get_records,
provide_context=True,
op_kwargs={'custom_cur_size': str(cur_size)},
trigger_rule="all_done",
)
send_to_endpoint = CustomOperator(
dag=dag,
task_id='custom_op_'+str(cur_size),
data_xcom_task_id=python_task.task_id,
data_xcom_key='data_to_cus_'+str(cur_size),
trigger_rule="all_done",
)
mysql_read_data >> python_task >> send_to_endpoint
cur_size += 1000
我有以下需求。 一个 table 包含 10000 条记录。我需要从中获取 1000,进行一些处理并发送到 HTTP 端点。然后获取下一个 1000 并执行相同的操作。这 1000 个块可以独立处理。 所以我想出了下面的DAG。
default_args = {
"owner": "airflow",
"depends_on_past": True,
"wait_for_downstream":True,
"max_active_runs":1,
"start_date": datetime(2020, 10, 8),
"catchup": False
}
dag = DAG("dag_id", default_args=default_args, schedule_interval=timedelta(1))
limit = 10000
cur_size = 0
while limit>cur_size:
def get_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='mysql_read_data_'+str(cur_size))
data = [
{
"attributes": {
"att1": x,
"att2": y
}
}
for x, y in xcom
]
data = json.dumps(data)
ti.xcom_push(key='data_to_cus_'+str(cur_size), value=data)
return data
mysql_read_data = MySQLReadOperator(dag=dag,
mysql_conn_id='temper_flat_old',
task_id='mysql_read_data_'+str(cur_size),
params={'limit': 100, 'offset': cur_size},
sql="sql/client_data.sql",
trigger_rule="all_done",
)
python_task = PythonOperator(
task_id='python_func_task_'+str(cur_size),
dag=dag,
python_callable=get_records,
provide_context=True,
trigger_rule="all_done",
)
send_to_endpoint = CustomOperator(
dag=dag,
task_id='custom_op_'+str(cur_size),
data_xcom_task_id=python_task.task_id,
data_xcom_key='data_to_cus_'+str(cur_size),
trigger_rule="all_done",
)
mysql_read_data >> python_task >> send_to_endpoint
cur_size += 1000
除一件事外,这工作正常。我在这一行 params={'limit': 100, 'offset': cur_size},
中进行分页,其中 cur_size
由迭代设置。但不幸的是,第二次迭代在完成第一次迭代的任务之前就已经开始了。然后,当我在 get_records()
中执行 xcom_pull 时, cur_size
是错误的。还有 data_xcom_key='data_to_kva_'+str(cur_size),
.
要么我需要阻止它执行下一次迭代,直到它完全完成当前迭代,要么必须有办法正确地让 task_id
正确地从 xcom
推或拉
因为现在我得到了错误
ERROR - 'NoneType' object is not iterable
原因是我用来引用 pull form xcom 的任务 id 不同
我该如何解决这个问题?谢谢
我认为主要问题是在 while 循环中定义函数。发生的事情是 Airflow 每次解析文件时都会呈现该函数,这通常每隔几个 minutes/seconds 发生一次。 PyhtonOperator 然后调用该函数,而不管 cur_size.
你真正想要的是将参数从 PythonOperator
传递给函数,你可以通过 op_kwargs
:
def get_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='mysql_read_data_'+str(kwargs['custom_cur_size']))
data = [
{
"attributes": {
"att1": x,
"att2": y
}
}
for x, y in xcom
]
data = json.dumps(data)
ti.xcom_push(key='data_to_cus_'+str(kwargs['custom_cur_size']), value=data)
return data
while limit>cur_size:
mysql_read_data = MySQLReadOperator(dag=dag,
mysql_conn_id='temper_flat_old',
task_id='mysql_read_data_'+str(cur_size),
params={'limit': 100, 'offset': cur_size},
sql="sql/client_data.sql",
trigger_rule="all_done",
)
python_task = PythonOperator(
task_id='python_func_task_'+str(cur_size),
dag=dag,
python_callable=get_records,
provide_context=True,
op_kwargs={'custom_cur_size': str(cur_size)},
trigger_rule="all_done",
)
send_to_endpoint = CustomOperator(
dag=dag,
task_id='custom_op_'+str(cur_size),
data_xcom_task_id=python_task.task_id,
data_xcom_key='data_to_cus_'+str(cur_size),
trigger_rule="all_done",
)
mysql_read_data >> python_task >> send_to_endpoint
cur_size += 1000