Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复 运行

Airflow - Generating tasks dynamically from BigQuery, but tasks are run repeatedly before previous finishes

上下文

我正在尝试使用 Composer、DataProc 和 BigQuery 在 Google Cloud Platform 上构建摄取管道。我在 BigQuery 中有一个 table,其中包含数据源及其相关文件的记录。因此,如果我需要提取 5 个文件,则 BigQuery table 中有 5 条记录。明天它可能是不同数量的文件。因此,我考虑在我的 DAG 中动态构建任务。

高层设计如下:

此设置 运行 没问题。我可以在 Airflow UI.

中看到我的 DAG 和所有动态生成的任务

编辑:只是添加了一些细节。 BigQuery table 的记录少于 25 条,因此查询 table 无关紧要。每 30 秒查询一次 table 是。其次,我只需要这个 DAG 每 4 小时左右 运行 一次。我不打算让我的作曲家 运行ning 暂时离开。我需要每 4 小时启动 Composer,运行 DAG 一次 处理所有可用文件,然后关闭。

问题

当这些 DataProc 任务正在执行时,大约几分钟后 Airflow 会刷新 DAG 并 运行 再次执行同一组任务。在 DataProc 作业控制台中,我看到同一任务的 2 个(有时是 3 个)实例处于 运行ning 状态。这是不可取的。

我试过的

我在任务级别设置了 retries=0,在 DAG 上我设置了 catchup=Falsemax_active_runs=1schedule_interval='@once'。 DAG 的默认参数也有 retries=0.

我认为这个问题是因为我从 BigQuery 中提取记录的部分是普通功能的一部分,而不是它本身的任务。我没有把它放在任务中的原因是因为我找不到将从 BigQuery 获取的结果传递到后续任务的解决方案,我必须在这些任务中循环它们。

我尝试调用 PythonOperator 并在其中执行 Variable.set("df", df),希望我可以循环 Variable.get("df"),但也没有成功。

下面分享相关代码

def fetch_pending_files_from_bq():
    # fetch records from BigQuery and return as dataframe

default_args = {
    'start_date': yesterday,
    'default_timezone': 'utc',
    'retries': 0
}

dag = DAG(
    dagid,
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    description='DAG to ingest data',
    schedule_interval='@once'
)

start_dag = DummyOperator(task_id="start_dag", dag=dag)
end_dag = DummyOperator(task_id="end_dag", dag=dag)

pending_files_df = fetch_pending_files_from_bq()

for index, row in pending_files_df.iterrows():
    task = DataProcSparkOperator(
        dag=dag,
        task_id=row["file_name"],
        arguments=dataproc_args,
        region="us-east1",
        job_name="job_{}".format(task_id),
        dataproc_spark_jars=dataproc_jars,
        ....
        ....
    )

    task.set_upstream(start_dag)
    task.set_downstream(end_dag)

我得到了我想要的编排,唯一的问题是我的 DataProc 作业会自动重新运行。

欢迎任何想法。

在深入研究设计时,我意识到 fetch_pending_files_from_bq 不是任务,因此每次刷新 dag 时都会执行它。这导致了多次查询,还导致意外创建重复任务。因此我放弃了这个设计。

我可以使用子标签解决这个问题。第一个 subdag 从 BQ 读取并写入 GCS。第二个 subdag 从 GCS 读取文件并动态创建任务。