我如何安排我的 DAG 每 5 分钟 运行?
How can I schedule my DAG to run every 5 minutes?
我正在使用 GCP cloud composer 并且有一个气流实例和一个 DAG。
DAG 的默认参数是:
# Define the default arguments for the DAG.
default_args = {
# If the start date is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG
# uploads.
"start_date": datetime.datetime(2021, 12, 2) #yesterday
,"owner": "foobar"
,"schedule_interval": "*/5 * * * *"
,"tags": ["create_empty_dataset", "BigQuery"]
DAG 运行 没问题,但时间表(每 5 分钟)就不行。我尝试使用 datetime.timedelta(minutes=5)
但它没有用。它只运行一次,仅此而已。这就是我所看到的:
有什么想法吗?如有必要,我可以编辑和共享 DAG 代码。很简单。
谢谢!
您的问题在于您如何设置参数。
default_args
在 DAG 级别设置,但它包含传递给运算符的参数。因此,当您在 default_args
中设置 schedule_interval
时,这意味着该参数将传递给 DAG 中的所有运算符,这没有任何意义,因为运算符没有 schedule_interval
参数。
您应该在 DAG 构造器中设置 schedule_interval
:
DAG = (..., schedule_interval="*/5 * * * *", default_args=default_args)
我正在使用 GCP cloud composer 并且有一个气流实例和一个 DAG。
DAG 的默认参数是:
# Define the default arguments for the DAG.
default_args = {
# If the start date is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG
# uploads.
"start_date": datetime.datetime(2021, 12, 2) #yesterday
,"owner": "foobar"
,"schedule_interval": "*/5 * * * *"
,"tags": ["create_empty_dataset", "BigQuery"]
DAG 运行 没问题,但时间表(每 5 分钟)就不行。我尝试使用 datetime.timedelta(minutes=5)
但它没有用。它只运行一次,仅此而已。这就是我所看到的:
有什么想法吗?如有必要,我可以编辑和共享 DAG 代码。很简单。
谢谢!
您的问题在于您如何设置参数。
default_args
在 DAG 级别设置,但它包含传递给运算符的参数。因此,当您在 default_args
中设置 schedule_interval
时,这意味着该参数将传递给 DAG 中的所有运算符,这没有任何意义,因为运算符没有 schedule_interval
参数。
您应该在 DAG 构造器中设置 schedule_interval
:
DAG = (..., schedule_interval="*/5 * * * *", default_args=default_args)