基于外部文件的气流中的动态任务
Dynamic tasks in airflow based on an external file
我正在从外部文件中读取元素列表并遍历元素以创建一系列任务。
例如,如果文件中有 2 个元素 - [A, B]。将有 2 个系列任务:
A1 -> A2 ..
B1 -> B2 ...
此读取元素逻辑不是任何任务的一部分,而是在 DAG 本身中。因此,Scheduler 在读取 DAG 文件时每天多次调用它。我只想在 DAG 运行时调用它。
想知道是否已经有针对此类用例的模式?
根据您的要求,如果您正在寻找的是避免多次读取文件,但您不介意多次从元数据数据库中读取,那么您可以改变您的方法以使用 Variables
作为动态创建任务的迭代源。
一个基本示例可以在 PythonOperator
中执行文件读取并设置 Variables
稍后您将使用它进行迭代(相同的可调用对象):
sample_file.json:
{
"cities": [ "London", "Paris", "BA", "NY" ]
}
任务定义:
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import json
def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
Variable.set(key='list_of_cities',
value=data['cities'], serialize_json=True)
print('Loading Variable from file...')
def _say_hello(city_name):
print('hello from ' + city_name)
with DAG('dynamic_tasks_from_var', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:
read_file = PythonOperator(
task_id='read_file',
python_callable=_read_file
)
然后您可以读取该变量并创建动态任务。 (设置 default_var
很重要)。 TaskGroup
是可选的。
# Top-level code
updated_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
print(f'Updated LIST: {updated_list}')
with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
for index, city in enumerate(updated_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_say_hello,
op_kwargs={'city_name': city}
)
# DAG level dependencies
read_file >> dynamic_tasks_group
在 Scheduler 日志中,您只会找到:
INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']
Dag 图视图:
通过这种方法,top-level code, hence read by the Scheduler continuously, is the call to Variable.get()
method. If you need to read from many variables, it's important to remember that it's recommended to store them in one single JSON value to avoid constantly create connections to the metadata database (example in this article)。
更新:
- 至于 11-2021,这种方法被认为是一种“快速而肮脏”的解决方案。
- 有用吗?是的,完全是。它是生产质量代码吗?号
- 有什么问题吗?每次调度程序解析文件时都会访问 DB,默认情况下每 30 秒访问一次,与您的 DAG 执行无关。有关 Airflow 最佳实践的完整详细信息,top-level code。
- 如何改进?考虑有关 dynamic DAG generation 的任何推荐方法是否适用于您的需求。
我正在从外部文件中读取元素列表并遍历元素以创建一系列任务。
例如,如果文件中有 2 个元素 - [A, B]。将有 2 个系列任务:
A1 -> A2 ..
B1 -> B2 ...
此读取元素逻辑不是任何任务的一部分,而是在 DAG 本身中。因此,Scheduler 在读取 DAG 文件时每天多次调用它。我只想在 DAG 运行时调用它。
想知道是否已经有针对此类用例的模式?
根据您的要求,如果您正在寻找的是避免多次读取文件,但您不介意多次从元数据数据库中读取,那么您可以改变您的方法以使用 Variables
作为动态创建任务的迭代源。
一个基本示例可以在 PythonOperator
中执行文件读取并设置 Variables
稍后您将使用它进行迭代(相同的可调用对象):
sample_file.json:
{
"cities": [ "London", "Paris", "BA", "NY" ]
}
任务定义:
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import json
def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
Variable.set(key='list_of_cities',
value=data['cities'], serialize_json=True)
print('Loading Variable from file...')
def _say_hello(city_name):
print('hello from ' + city_name)
with DAG('dynamic_tasks_from_var', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:
read_file = PythonOperator(
task_id='read_file',
python_callable=_read_file
)
然后您可以读取该变量并创建动态任务。 (设置 default_var
很重要)。 TaskGroup
是可选的。
# Top-level code
updated_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
print(f'Updated LIST: {updated_list}')
with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
for index, city in enumerate(updated_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_say_hello,
op_kwargs={'city_name': city}
)
# DAG level dependencies
read_file >> dynamic_tasks_group
在 Scheduler 日志中,您只会找到:
INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']
Dag 图视图:
通过这种方法,top-level code, hence read by the Scheduler continuously, is the call to Variable.get()
method. If you need to read from many variables, it's important to remember that it's recommended to store them in one single JSON value to avoid constantly create connections to the metadata database (example in this article)。
更新:
- 至于 11-2021,这种方法被认为是一种“快速而肮脏”的解决方案。
- 有用吗?是的,完全是。它是生产质量代码吗?号
- 有什么问题吗?每次调度程序解析文件时都会访问 DB,默认情况下每 30 秒访问一次,与您的 DAG 执行无关。有关 Airflow 最佳实践的完整详细信息,top-level code。
- 如何改进?考虑有关 dynamic DAG generation 的任何推荐方法是否适用于您的需求。