使用DB动态生成airflow任务
Use DB to generate airflow tasks dynamically
我想要 运行 像这样的 airflow dag ->
- 我有 2 个 airflow worker W1 和 W2。
- 在第 1 周,我安排了一个任务 (W1-1),但在第 2 周,我想创建 X 个任务 (W2-1、W2-2 ... W2-X)。
- 每个任务的数字 X 和 bash 命令将从数据库调用中派生。
- 工作人员 W2 的所有任务应在 W1 完成后 运行 并行。
这是我的代码
dag = DAG('deploy_single', catchup=False, default_args=default_args, schedule_interval='16 15 * * *')
t1 = BashOperator(
task_id='dummy_task',
bash_command='echo hi > /tmp/hi',
queue='W1_queue',
dag=dag)
get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"
db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)
cursor = connection.cursor()
cursor.execute(get_all_engines)
records = cursor.fetchall()
i = 1
for record in records:
t = BashOperator(
task_id='script_test_'+str(i),
bash_command="{full_command} ".format(full_command=str(record[0])),
queue=str(record[1]),
dag=dag)
t.set_upstream(t1)
i += 1
cursor.close()
connection.close()
然而,当我运行这样做时,W1 上的任务成功完成但W2 上的所有任务都失败了。在气流 UI 中,我可以看到它可以解决正确数量的任务(在本例中为 10 个),但是这 10 个中的每一个都失败了。
查看日志,我看到在 W2(在另一台机器上)上,airflow 找不到 db_creds.json
文件。
我不想向 W2 提供数据库信用文件。
我的问题是在这种情况下如何动态创建气流任务?
基本上我想 运行 气流服务器上的数据库查询,并根据该查询的结果将任务分配给一个或多个工作人员。数据库将包含有关哪些引擎处于活动状态等的更新信息,我希望 DAG 反映这一点。从日志来看,似乎每个工作人员 运行 都在进行数据库查询。向每个工作人员提供对数据库的访问权不是一种选择。
一种方法是将信息存储在 Airflow Variable 中。
您可以在变量中获取动态生成 DAG(和必要的配置)所需的信息,并让 W2 从那里访问它。
变量是一个airflow model,可用于存储所有任务都可以访问的静态信息(没有关联时间戳的信息)。
谢谢@viraj-parekh 和@cwurtz。
经过反复试验,找到了在这种情况下使用气流变量的正确方法。
步骤 1) 我们创建另一个名为 gen_var.py
的脚本并将其放在 dag 文件夹中。这样,调度程序将选取并生成变量。如果生成变量的代码在 deploy_single
dag 中,那么我们 运行 会遇到相同的依赖问题,因为 worker 也会尝试处理 dag。
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
import json
import psycopg2
from airflow.models import Variable
from psycopg2.extensions import AsIs
get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"
db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)
cursor = connection.cursor()
cursor.execute(get_all_engines)
records = cursor.fetchall()
hosts = {}
i = 1
for record in records:
comm_dict = {}
comm_dict['full_command'] = str(record[0])
comm_dict['queue_name'] = str(record[1])
hosts[i] = comm_dict
i += 1
cursor.close()
connection.close()
Variable.set("hosts",hosts,serialize_json=True)
注意对 serialize_json
的调用。 Airflow 将尝试将变量存储为字符串。如果你想把它存储为字典,那么使用serialize_json=True
。 Airflow 仍会通过 json.dumps
将其存储为字符串
步骤 2) 简化 dag 并像这样调用此 "hosts"
变量(现在反序列化以取回字典)-
hoztz = Variable.get("hosts",deserialize_json=True)
for key in hoztz:
host = hoztz.get(key)
t = BashOperator(
task_id='script_test_'+str(key),
bash_command="{full_command} ".format(full_command=str(host.get('full_command'))),
queue=str(host.get('queue_name')),
dag=dag)
t.set_upstream(t1)
希望对其他人有所帮助。
我想要 运行 像这样的 airflow dag ->
- 我有 2 个 airflow worker W1 和 W2。
- 在第 1 周,我安排了一个任务 (W1-1),但在第 2 周,我想创建 X 个任务 (W2-1、W2-2 ... W2-X)。
- 每个任务的数字 X 和 bash 命令将从数据库调用中派生。
- 工作人员 W2 的所有任务应在 W1 完成后 运行 并行。
这是我的代码
dag = DAG('deploy_single', catchup=False, default_args=default_args, schedule_interval='16 15 * * *')
t1 = BashOperator(
task_id='dummy_task',
bash_command='echo hi > /tmp/hi',
queue='W1_queue',
dag=dag)
get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"
db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)
cursor = connection.cursor()
cursor.execute(get_all_engines)
records = cursor.fetchall()
i = 1
for record in records:
t = BashOperator(
task_id='script_test_'+str(i),
bash_command="{full_command} ".format(full_command=str(record[0])),
queue=str(record[1]),
dag=dag)
t.set_upstream(t1)
i += 1
cursor.close()
connection.close()
然而,当我运行这样做时,W1 上的任务成功完成但W2 上的所有任务都失败了。在气流 UI 中,我可以看到它可以解决正确数量的任务(在本例中为 10 个),但是这 10 个中的每一个都失败了。
查看日志,我看到在 W2(在另一台机器上)上,airflow 找不到 db_creds.json
文件。
我不想向 W2 提供数据库信用文件。
我的问题是在这种情况下如何动态创建气流任务? 基本上我想 运行 气流服务器上的数据库查询,并根据该查询的结果将任务分配给一个或多个工作人员。数据库将包含有关哪些引擎处于活动状态等的更新信息,我希望 DAG 反映这一点。从日志来看,似乎每个工作人员 运行 都在进行数据库查询。向每个工作人员提供对数据库的访问权不是一种选择。
一种方法是将信息存储在 Airflow Variable 中。
您可以在变量中获取动态生成 DAG(和必要的配置)所需的信息,并让 W2 从那里访问它。
变量是一个airflow model,可用于存储所有任务都可以访问的静态信息(没有关联时间戳的信息)。
谢谢@viraj-parekh 和@cwurtz。
经过反复试验,找到了在这种情况下使用气流变量的正确方法。
步骤 1) 我们创建另一个名为 gen_var.py
的脚本并将其放在 dag 文件夹中。这样,调度程序将选取并生成变量。如果生成变量的代码在 deploy_single
dag 中,那么我们 运行 会遇到相同的依赖问题,因为 worker 也会尝试处理 dag。
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
import json
import psycopg2
from airflow.models import Variable
from psycopg2.extensions import AsIs
get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"
db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)
cursor = connection.cursor()
cursor.execute(get_all_engines)
records = cursor.fetchall()
hosts = {}
i = 1
for record in records:
comm_dict = {}
comm_dict['full_command'] = str(record[0])
comm_dict['queue_name'] = str(record[1])
hosts[i] = comm_dict
i += 1
cursor.close()
connection.close()
Variable.set("hosts",hosts,serialize_json=True)
注意对 serialize_json
的调用。 Airflow 将尝试将变量存储为字符串。如果你想把它存储为字典,那么使用serialize_json=True
。 Airflow 仍会通过 json.dumps
步骤 2) 简化 dag 并像这样调用此 "hosts"
变量(现在反序列化以取回字典)-
hoztz = Variable.get("hosts",deserialize_json=True)
for key in hoztz:
host = hoztz.get(key)
t = BashOperator(
task_id='script_test_'+str(key),
bash_command="{full_command} ".format(full_command=str(host.get('full_command'))),
queue=str(host.get('queue_name')),
dag=dag)
t.set_upstream(t1)
希望对其他人有所帮助。