如何从 google 云平台上的 dags 主文件夹中气流 loads/updates DagBag?

How airflow loads/updates DagBag from dags home folder on google cloud platform?

请不要对我的回答投反对票。如果需要,我会更新并更正我的话。我已经完成了家庭作业研究。我是个新手,所以想了解一下。

我想了解 Google 云平台上的气流如何获取从 dags 主文件夹到 UI 的更改。另外请帮我处理我的 dags 安装脚本。我已经阅读了很多答案和书籍。书 link 是 here

我试着从第 69 页找出我的答案,上面写着

3.11 Scheduling & Triggers The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) inspects active tasks to see whether they can be triggered.

我从这本书中了解到调度程序会定期从 dags 主文件夹中获取更改。 (正确吗?)

我也阅读了多个关于堆栈溢出的答案,我发现这个很有用

但答案仍然不包含 dag 主文件夹中来自 script.py 的 dagbag creation/updation 执行此操作的进程。如何感知变化。

请帮助我使用 dags 安装脚本。 我们已经创建了一个通用的 python 脚本,它通过 reading/iterating 通过配置文件动态创建 dags。

目录结构如下

/dags/workflow/
/dags/workflow/config/dag_a.json
/dags/workflow/config/dag_b.json
/dags/workflow/task_a_with_single_operator.py
/dags/workflow/task_b_with_single_operator.py
/dags/dag_creater.py

执行流程dag_creater.py如下:-

 1. Iterate in dags/workflow/config folder get the Config JSON file and
    read variable dag_id.
 2. create Parent_dag = DAG(dag_id=dag_id,
    start_date=start_date, schedule_interval=schedule_interval,
                             default_args=default_args, catchup=False) 
 3. Read tasks with dependencies of that dag_id from config json file
    (example :- [[a,[]],[b,[a]],[c,[b]]]) and code it as task_a >>
    task_b >> task_c

这样就创建了dag。一切正常。在 UI 和 运行 上也可以看到 Dags。

但问题是,我的 dag 创建脚本每次都是 运行ning。即使在每个任务日志中,我也会看到所有 dags 的日志。我希望这个脚本 运行 一次。只是为了填写元数据中的条目。我无法理解为什么每次都是运行ning。 请让我明白过程。

我知道 airflow initdb 是 运行 一旦我们第一次设置元数据。所以这并不是一直都在做这个更新。

Please Note: I can't type real code as that is the restriction from my organization. However if asked, i will provide more information.

Airflow Scheduler is actually continuously running in Airflow runtime environment as a main contributor for monitoring changes in DAG folder and triggering the relevant DAG tasks residing in this folder. The main settings for Airflow Scheduler service can be found in airflow.cfg file, essentially the heart beat intervals 有效影响一般 DAG 任务维护。

但是,特定任务的执行方式是根据 Airflow 配置中的 Executor's 模型定义的。

为了存储可用于 Airflow 运行时间环境的 DAG,GCP Composer 使用 Cloud Storage,实现特定文件夹 structure, synchronizing any object arriving to /dags folder with *.py extension be verified if it contains the DAG definition

如果您希望 运行 在 Airflow 运行 时间内传播 DAG 脚本,那么在这个特定的用例中,我建议您查看 PythonOperator, using it in the separate DAG to invoke and execute your custom generic Python code with guarantees scheduling it only once a time. You can check out this Stack thread 的实现细节。