Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复 运行
Airflow - Generating tasks dynamically from BigQuery, but tasks are run repeatedly before previous finishes
上下文
我正在尝试使用 Composer、DataProc 和 BigQuery 在 Google Cloud Platform 上构建摄取管道。我在 BigQuery 中有一个 table,其中包含数据源及其相关文件的记录。因此,如果我需要提取 5 个文件,则 BigQuery table 中有 5 条记录。明天它可能是不同数量的文件。因此,我考虑在我的 DAG 中动态构建任务。
高层设计如下:
- 执行函数以 Pandas 数据帧(或字典,两者都可以)的形式从 BigQuery 获取数据
- 迭代数据框
- 对于数据框中的每一行,创建一个 DataProcSparkOperator,其中包含有关文件和相应参数的详细信息
此设置 运行 没问题。我可以在 Airflow UI.
中看到我的 DAG 和所有动态生成的任务
编辑:只是添加了一些细节。 BigQuery table 的记录少于 25 条,因此查询 table 无关紧要。每 30 秒查询一次 table 是。其次,我只需要这个 DAG 每 4 小时左右 运行 一次。我不打算让我的作曲家 运行ning 暂时离开。我需要每 4 小时启动 Composer,运行 DAG 一次 处理所有可用文件,然后关闭。
问题
当这些 DataProc 任务正在执行时,大约几分钟后 Airflow 会刷新 DAG 并 运行 再次执行同一组任务。在 DataProc 作业控制台中,我看到同一任务的 2 个(有时是 3 个)实例处于 运行ning 状态。这是不可取的。
我试过的
我在任务级别设置了 retries=0
,在 DAG 上我设置了 catchup=False
、max_active_runs=1
和 schedule_interval='@once'
。 DAG 的默认参数也有 retries=0
.
我认为这个问题是因为我从 BigQuery 中提取记录的部分是普通功能的一部分,而不是它本身的任务。我没有把它放在任务中的原因是因为我找不到将从 BigQuery 获取的结果传递到后续任务的解决方案,我必须在这些任务中循环它们。
我尝试调用 PythonOperator 并在其中执行 Variable.set("df", df)
,希望我可以循环 Variable.get("df")
,但也没有成功。
下面分享相关代码
def fetch_pending_files_from_bq():
# fetch records from BigQuery and return as dataframe
default_args = {
'start_date': yesterday,
'default_timezone': 'utc',
'retries': 0
}
dag = DAG(
dagid,
default_args=default_args,
catchup=False,
max_active_runs=1,
description='DAG to ingest data',
schedule_interval='@once'
)
start_dag = DummyOperator(task_id="start_dag", dag=dag)
end_dag = DummyOperator(task_id="end_dag", dag=dag)
pending_files_df = fetch_pending_files_from_bq()
for index, row in pending_files_df.iterrows():
task = DataProcSparkOperator(
dag=dag,
task_id=row["file_name"],
arguments=dataproc_args,
region="us-east1",
job_name="job_{}".format(task_id),
dataproc_spark_jars=dataproc_jars,
....
....
)
task.set_upstream(start_dag)
task.set_downstream(end_dag)
我得到了我想要的编排,唯一的问题是我的 DataProc 作业会自动重新运行。
欢迎任何想法。
在深入研究设计时,我意识到 fetch_pending_files_from_bq 不是任务,因此每次刷新 dag 时都会执行它。这导致了多次查询,还导致意外创建重复任务。因此我放弃了这个设计。
我可以使用子标签解决这个问题。第一个 subdag 从 BQ 读取并写入 GCS。第二个 subdag 从 GCS 读取文件并动态创建任务。
上下文
我正在尝试使用 Composer、DataProc 和 BigQuery 在 Google Cloud Platform 上构建摄取管道。我在 BigQuery 中有一个 table,其中包含数据源及其相关文件的记录。因此,如果我需要提取 5 个文件,则 BigQuery table 中有 5 条记录。明天它可能是不同数量的文件。因此,我考虑在我的 DAG 中动态构建任务。
高层设计如下:
- 执行函数以 Pandas 数据帧(或字典,两者都可以)的形式从 BigQuery 获取数据
- 迭代数据框
- 对于数据框中的每一行,创建一个 DataProcSparkOperator,其中包含有关文件和相应参数的详细信息
此设置 运行 没问题。我可以在 Airflow UI.
中看到我的 DAG 和所有动态生成的任务编辑:只是添加了一些细节。 BigQuery table 的记录少于 25 条,因此查询 table 无关紧要。每 30 秒查询一次 table 是。其次,我只需要这个 DAG 每 4 小时左右 运行 一次。我不打算让我的作曲家 运行ning 暂时离开。我需要每 4 小时启动 Composer,运行 DAG 一次 处理所有可用文件,然后关闭。
问题
当这些 DataProc 任务正在执行时,大约几分钟后 Airflow 会刷新 DAG 并 运行 再次执行同一组任务。在 DataProc 作业控制台中,我看到同一任务的 2 个(有时是 3 个)实例处于 运行ning 状态。这是不可取的。
我试过的
我在任务级别设置了 retries=0
,在 DAG 上我设置了 catchup=False
、max_active_runs=1
和 schedule_interval='@once'
。 DAG 的默认参数也有 retries=0
.
我认为这个问题是因为我从 BigQuery 中提取记录的部分是普通功能的一部分,而不是它本身的任务。我没有把它放在任务中的原因是因为我找不到将从 BigQuery 获取的结果传递到后续任务的解决方案,我必须在这些任务中循环它们。
我尝试调用 PythonOperator 并在其中执行 Variable.set("df", df)
,希望我可以循环 Variable.get("df")
,但也没有成功。
下面分享相关代码
def fetch_pending_files_from_bq():
# fetch records from BigQuery and return as dataframe
default_args = {
'start_date': yesterday,
'default_timezone': 'utc',
'retries': 0
}
dag = DAG(
dagid,
default_args=default_args,
catchup=False,
max_active_runs=1,
description='DAG to ingest data',
schedule_interval='@once'
)
start_dag = DummyOperator(task_id="start_dag", dag=dag)
end_dag = DummyOperator(task_id="end_dag", dag=dag)
pending_files_df = fetch_pending_files_from_bq()
for index, row in pending_files_df.iterrows():
task = DataProcSparkOperator(
dag=dag,
task_id=row["file_name"],
arguments=dataproc_args,
region="us-east1",
job_name="job_{}".format(task_id),
dataproc_spark_jars=dataproc_jars,
....
....
)
task.set_upstream(start_dag)
task.set_downstream(end_dag)
我得到了我想要的编排,唯一的问题是我的 DataProc 作业会自动重新运行。
欢迎任何想法。
在深入研究设计时,我意识到 fetch_pending_files_from_bq 不是任务,因此每次刷新 dag 时都会执行它。这导致了多次查询,还导致意外创建重复任务。因此我放弃了这个设计。
我可以使用子标签解决这个问题。第一个 subdag 从 BQ 读取并写入 GCS。第二个 subdag 从 GCS 读取文件并动态创建任务。