气流:"already registered for DAG" 动态生成任务时

Airflow: "already registered for DAG" when dynamically generate tasks

最近我开始在我的一些 dag 文件中使用 TaskFlow API,在这些文件中任务是动态生成的,并开始注意到日志中的(很多)警告消息。下面是生成此消息的虚拟 dag 文件:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup


NUMS = [1, 2]

default_args = {
    "owner": "henrique",
    "depends_on_past": False,
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=15),
}


def print_id(num: int):
    print(num)
    return num


def run_tests():
    results = []
    for i in NUMS:
        result = task(task_id=f"run_{i}")(print_id)(i)
        results.append(result)

    return results


@task()
def agg(results):
    print(results)


@dag(
    "test_tg",
    default_args=default_args,
    schedule_interval="@once",
    start_date=days_ago(1),
    max_active_runs=1,
)
def test_supervisor():
    task_start = DummyOperator(task_id="task_start")
    task_end = DummyOperator(task_id="task_end")
    groups = []
    for i in NUMS:
        with TaskGroup(group_id=f"{i}_num_group") as tg:
            results = run_tests()
            aggregation = agg(results)

            groups.append(tg)

    task_start >> groups >> task_end


data_dag = test_supervisor()

当 运行 这个 dag 我开始收到很多以下警告消息:

"[2021-08-24 09:46:46,438] {baseoperator.py:1301} WARNING - Dependency <Task(_PythonDecoratedOperator): 2_num_group.agg>, 2_num_group.run_2 already registered for DAG: test_tg"
"[2021-08-24 09:46:46,438] {baseoperator.py:1301} WARNING - Dependency <Task(_PythonDecoratedOperator): 2_num_group.run_2>, 2_num_group.agg already registered for DAG: test_tg"
"[2021-08-24 09:46:46,437] {baseoperator.py:1301} WARNING - Dependency <Task(_PythonDecoratedOperator): 2_num_group.run_1>, 2_num_group.agg already registered for DAG: test_tg"
"[2021-08-24 09:46:46,413] {baseoperator.py:1301} WARNING - Dependency <Task(_PythonDecoratedOperator): 1_num_group.agg>, 1_num_group.run_1 already registered for DAG: test_tg"
....

即使 DAG 未 运行 以及暂停时,这些消息也会继续出现。

我是不是在创建任务时做错了什么?

提前致谢

我试过你的代码并且工作正常,我没有收到任何提到的警告。我 运行 Airflow v2.1.2 使用官方 docker-compose 设置。

我在旧版本的 Airflow 回购 (pr, pr) 中发现了一些与您收到的消息相关的问题,但这些问题现在应该已经解决了。尝试升级到最新版本的 Airflow,应该可以解决问题。

编辑:

以下是我将您的代码复制并粘贴到我的 运行 AF 后得到的:

图表视图:

日志:

airflow dags test test_tg 2021-08-24 输出:

[2021-08-24 15:59:19,247] {dagbag.py:496} INFO - Filling up the DagBag from /opt/airflow/dags
[2021-08-24 15:59:19,865] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.task_start 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:24,991] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=task_start
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[2021-08-24 15:59:25,003] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=task_start, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155925
[2021-08-24 15:59:25,043] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:25,083] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 7 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7
[2021-08-24 15:59:25,122] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.1_num_group.run_1 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:25,164] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.1_num_group.run_2 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:25,241] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.2_num_group.run_1 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:25,279] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.2_num_group.run_2 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:29,814] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=1_num_group.run_1
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
1
[2021-08-24 15:59:29,849] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=1_num_group.run_1, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155929
[2021-08-24 15:59:29,876] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:29,912] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=1_num_group.run_2
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
2
[2021-08-24 15:59:29,932] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=1_num_group.run_2, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155929
[2021-08-24 15:59:29,965] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:30,027] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=2_num_group.run_1
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
1
[2021-08-24 15:59:30,060] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=2_num_group.run_1, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155930
[2021-08-24 15:59:30,087] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:30,124] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=2_num_group.run_2
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
2
[2021-08-24 15:59:30,151] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=2_num_group.run_2, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155930
[2021-08-24 15:59:30,185] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:30,238] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 3 | succeeded: 5 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 3
[2021-08-24 15:59:30,275] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.1_num_group.agg 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:30,310] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.2_num_group.agg 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:34,826] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=1_num_group.agg
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[1, 2]
[2021-08-24 15:59:34,833] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=1_num_group.agg, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155934
[2021-08-24 15:59:34,859] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:34,904] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=2_num_group.agg
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[1, 2]
[2021-08-24 15:59:34,915] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=2_num_group.agg, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155934
[2021-08-24 15:59:34,945] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:34,982] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 7 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-08-24 15:59:35,014] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.task_end 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:39,829] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=task_end
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[2021-08-24 15:59:39,841] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=task_end, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155939
[2021-08-24 15:59:39,867] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:39,890] {dagrun.py:444} INFO - Marking run <DagRun test_tg @ 2021-08-24 00:00:00+00:00: backfill__2021-08-24T00:00:00+00:00, externally triggered: False> successful
[2021-08-24 15:59:39,898] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 8 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-08-24 15:59:39,905] {backfill_job.py:831} INFO - Backfill done. Exiting.

airflow info 输出:

default@91172692e679:/opt/airflow$ airflow info

Apache Airflow
version                | 2.1.2                                                 
executor               | CeleryExecutor                                        
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler   
sql_alchemy_conn       | postgresql+psycopg2://airflow:airflow@postgres/airflow
dags_folder            | /opt/airflow/dags                                     
plugins_folder         | /opt/airflow/plugins                                  
base_log_folder        | /opt/airflow/logs                                     
remote_base_log_folder |                                  
...
...