Airflow Scheduler 为 schedule_interval 作为 None 的 DAG 抛出错误
Airflow Scheduler throws error for DAGs with schedule_interval as None
我遇到气流问题。有一个客户生成器脚本,它接受来自 yaml 文件的输入并加载 DAG。当所有 DAG yaml 文件的计划间隔为 Non "None" 时,它工作正常。有许多 DAG 具有 schedule_interval 和 None 并且很少有 @once.
YAML 文件示例是 -
cluster:
nodes: 10
subnet: "subnet-A"
instance: "m4.2xlarge"
configbucket: "bucketabc"
jar: "s3://xxxxx.jar"
conf: "app.conf"
schedule:
state: "unpause"
concurrency: 10
startdate: "2050-08-05 00:00"
cron: "None"
生成器脚本如下 -
if "schedule" in project_settings:
schedule_settings = project_settings["schedule"]
concurrency = schedule_settings["concurrency"]
cron = schedule_settings["cron"]
startdate = datetime.strptime(schedule_settings["startdate"], "%Y-%m-%d %H:%M")
#print "my projectname is: " + project
dag = DAG(
dag_id = project,
default_args=args,
user_defined_macros=user_macros,
schedule_interval=cron,
concurrency=concurrency,
start_date=startdate
)
当有很多 schedule_interval=None
的 DAG 时我得到的错误
INFO - [2020-04-08 12:30:45,529] {dagbag.py:302} ERROR - Failed to bag_dag: /home/deploy/airflow/dags/genertor.py
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 296, in process_file
croniter(dag._schedule_interval)
File "/usr/local/lib/python3.6/site-packages/croniter/croniter.py", line 91, in __init__
self.expanded, self.nth_weekday_of_month = self.expand(expr_format)
File "/usr/local/lib/python3.6/site-packages/croniter/croniter.py", line 468, in expand
raise CroniterBadCronError(cls.bad_length)
croniter.croniter.CroniterBadCronError: Exactly 5 or 6 columns has to be specified for iteratorexpression.
有人遇到过这个问题吗?
Airflow DAG schedule_interval
可以是 cron
压缩作为 string
也可以是 None
(注意不是 string
"None"
).
在您的设置中您有:
cron: "None"
这是Python中的一个字符串。如果您无法将该 YAML 文件更改为:
cron: None
您仍然可以在 DAG 本身中检查该字符串:
schedule_interval = None if cron == "None" else cron
我遇到气流问题。有一个客户生成器脚本,它接受来自 yaml 文件的输入并加载 DAG。当所有 DAG yaml 文件的计划间隔为 Non "None" 时,它工作正常。有许多 DAG 具有 schedule_interval 和 None 并且很少有 @once.
YAML 文件示例是 -
cluster:
nodes: 10
subnet: "subnet-A"
instance: "m4.2xlarge"
configbucket: "bucketabc"
jar: "s3://xxxxx.jar"
conf: "app.conf"
schedule:
state: "unpause"
concurrency: 10
startdate: "2050-08-05 00:00"
cron: "None"
生成器脚本如下 -
if "schedule" in project_settings:
schedule_settings = project_settings["schedule"]
concurrency = schedule_settings["concurrency"]
cron = schedule_settings["cron"]
startdate = datetime.strptime(schedule_settings["startdate"], "%Y-%m-%d %H:%M")
#print "my projectname is: " + project
dag = DAG(
dag_id = project,
default_args=args,
user_defined_macros=user_macros,
schedule_interval=cron,
concurrency=concurrency,
start_date=startdate
)
当有很多 schedule_interval=None
的 DAG 时我得到的错误INFO - [2020-04-08 12:30:45,529] {dagbag.py:302} ERROR - Failed to bag_dag: /home/deploy/airflow/dags/genertor.py
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 296, in process_file
croniter(dag._schedule_interval)
File "/usr/local/lib/python3.6/site-packages/croniter/croniter.py", line 91, in __init__
self.expanded, self.nth_weekday_of_month = self.expand(expr_format)
File "/usr/local/lib/python3.6/site-packages/croniter/croniter.py", line 468, in expand
raise CroniterBadCronError(cls.bad_length)
croniter.croniter.CroniterBadCronError: Exactly 5 or 6 columns has to be specified for iteratorexpression.
有人遇到过这个问题吗?
Airflow DAG schedule_interval
可以是 cron
压缩作为 string
也可以是 None
(注意不是 string
"None"
).
在您的设置中您有:
cron: "None"
这是Python中的一个字符串。如果您无法将该 YAML 文件更改为:
cron: None
您仍然可以在 DAG 本身中检查该字符串:
schedule_interval = None if cron == "None" else cron