调度程序未添加动态 dag
Dynamic dags not getting added by scheduler
我正在尝试创建动态 DAG,然后将它们发送到调度程序。我尝试了 https://www.astronomer.io/guides/dynamically-generating-dags/ 中的参考,效果很好。我在下面的代码中对其进行了一些更改。在调试问题时需要帮助。
我试过了
1. 测试 运行 文件。 Dag 被执行并且 globals() 正在打印所有 DAGs 对象。但不知何故没有在 list_dags 或 UI
中列出
from datetime import datetime, timedelta
import requests
import json
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
def create_dag(dag_id,
dag_number,
default_args):
def hello_world_py(*args):
print('Hello World')
print('This is DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id,
schedule_interval="@hourly",
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)
return dag
def fetch_new_dags(**kwargs):
for n in range(1, 10):
print("=====================START=========\n")
dag_id = "abcd_" + str(n)
print (dag_id)
print("\n")
globals()[dag_id] = create_dag(dag_id, n, default_args)
print(globals())
default_args = {
'owner': 'diablo_admin',
'depends_on_past': False,
'start_date': datetime(2019, 8, 8),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'trigger_rule': 'none_skipped'
#'schedule_interval': '0 * * * *'
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('testDynDags', default_args=default_args, schedule_interval='*/1 * * * *')
#schedule_interval='*/1 * * * *'
check_for_dags = PythonOperator(dag=dag,
task_id='tst_dyn_dag',
provide_context=True,
python_callable=fetch_new_dags
)
check_for_dags
预计动态创建 10 个 DAG 并添加到调度程序。
我想执行以下操作可以解决问题
- 完全删除全局
testDynDags
dag 和 tst_dyn_dags
任务(实例化和调用)
- 在全局范围内使用必要的参数调用您的
fetch_new_dags(..)
方法
说明
- 动态 dags/tasks 仅仅意味着你在编写 dag-definition 文件时有一个定义良好的逻辑 可以帮助创建具有已知结构的任务/dags预定义的时尚。
- 您无法在运行时确定 DAG 的结构(任务执行)。因此,例如,如果上游任务返回整数值 n,则您不能将 n 个相同的任务添加到 DAG。但是你可以遍历包含 n 个段的 YAML 文件并生成 n 个任务/dags。
很明显,将 dag 生成代码包装在 Airflow 任务本身中是没有意义的。
UPDATE-1
根据评论中的指示,我推断该要求要求您修改将输入(要创建多少 dag 或任务)提供给 DAG/任务生成脚本的外部源。虽然这确实是一个复杂的用例,但实现此目的的一种简单方法是创建 2 个独立的 DAG。
- 一个 dag 偶尔运行一次并生成存储在外部资源中的输入,如 Airflow Variable(或任何其他外部存储,如文件/S3/数据库等)
- 第二个 DAG 是通过读取第一个 DAG 编写的相同数据源以编程方式构建的
中获取灵感
我正在尝试创建动态 DAG,然后将它们发送到调度程序。我尝试了 https://www.astronomer.io/guides/dynamically-generating-dags/ 中的参考,效果很好。我在下面的代码中对其进行了一些更改。在调试问题时需要帮助。
我试过了 1. 测试 运行 文件。 Dag 被执行并且 globals() 正在打印所有 DAGs 对象。但不知何故没有在 list_dags 或 UI
中列出from datetime import datetime, timedelta
import requests
import json
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
def create_dag(dag_id,
dag_number,
default_args):
def hello_world_py(*args):
print('Hello World')
print('This is DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id,
schedule_interval="@hourly",
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)
return dag
def fetch_new_dags(**kwargs):
for n in range(1, 10):
print("=====================START=========\n")
dag_id = "abcd_" + str(n)
print (dag_id)
print("\n")
globals()[dag_id] = create_dag(dag_id, n, default_args)
print(globals())
default_args = {
'owner': 'diablo_admin',
'depends_on_past': False,
'start_date': datetime(2019, 8, 8),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'trigger_rule': 'none_skipped'
#'schedule_interval': '0 * * * *'
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('testDynDags', default_args=default_args, schedule_interval='*/1 * * * *')
#schedule_interval='*/1 * * * *'
check_for_dags = PythonOperator(dag=dag,
task_id='tst_dyn_dag',
provide_context=True,
python_callable=fetch_new_dags
)
check_for_dags
预计动态创建 10 个 DAG 并添加到调度程序。
我想执行以下操作可以解决问题
- 完全删除全局
testDynDags
dag 和tst_dyn_dags
任务(实例化和调用) - 在全局范围内使用必要的参数调用您的
fetch_new_dags(..)
方法
说明
- 动态 dags/tasks 仅仅意味着你在编写 dag-definition 文件时有一个定义良好的逻辑 可以帮助创建具有已知结构的任务/dags预定义的时尚。
- 您无法在运行时确定 DAG 的结构(任务执行)。因此,例如,如果上游任务返回整数值 n,则您不能将 n 个相同的任务添加到 DAG。但是你可以遍历包含 n 个段的 YAML 文件并生成 n 个任务/dags。
很明显,将 dag 生成代码包装在 Airflow 任务本身中是没有意义的。
UPDATE-1
根据评论中的指示,我推断该要求要求您修改将输入(要创建多少 dag 或任务)提供给 DAG/任务生成脚本的外部源。虽然这确实是一个复杂的用例,但实现此目的的一种简单方法是创建 2 个独立的 DAG。
- 一个 dag 偶尔运行一次并生成存储在外部资源中的输入,如 Airflow Variable(或任何其他外部存储,如文件/S3/数据库等)
- 第二个 DAG 是通过读取第一个 DAG 编写的相同数据源以编程方式构建的