dag.py 引发:"airflow.exceptions.AirflowException: Task is missing the start_date parameter",但它在代码中给出

dag.py raises: "airflow.exceptions.AirflowException: Task is missing the start_date parameter", but its given in code

我今天尝试创建我的第一个气流 DAG:

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'default_user',
    'start_date': days_ago(2),
    'depends_on_past': True,
    # With this set to true, the pipeline won't run if the previous day failed
    'email': ['demo@email.de'],
    'email_on_failure': True,
    # upon failure this pipeline will send an email to your email set above
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=30),
}

dag = DAG(
    'basic_dag_2',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)


def my_func():
    print('Hello from my_func')


bashtask = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

dummy_task = DummyOperator(task_id='dummy_task', retries=3)

python_task = PythonOperator(task_id='python_task', python_callable=my_func)

dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)

我的 Airflow 在 Python3.6.8 上正常 运行,但是当我尝试将 dagbag 导入 airflow 时它抛出这个异常,我不知道为什么:

[2020-05-11 17:11:15,601] {scheduler_job.py:1576} WARNING - No viable dags retrieved from /root/airflow/dags/first_dag.py
[2020-05-11 17:11:15,616] {scheduler_job.py:162} INFO - Processing /root/airflow/dags/first_dag.py took 0.031 seconds
[2020-05-11 17:12:05,647] {scheduler_job.py:154} INFO - Started process (PID=26569) to work on /root/airflow/dags/first_dag.py
[2020-05-11 17:12:05,653] {scheduler_job.py:1562} INFO - Processing file /root/airflow/dags/first_dag.py for tasks to queue
[2020-05-11 17:12:05,654] {logging_mixin.py:112} INFO - [2020-05-11 17:12:05,654] {dagbag.py:396} INFO - Filling up the DagBag from /root/airflow/dags/first_dag.py
[2020-05-11 17:12:05,666] {logging_mixin.py:112} INFO - [2020-05-11 17:12:05,662] {dagbag.py:239} ERROR - Failed to import: /root/airflow/dags/first_dag.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 236, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/lib64/python3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 684, in _load
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/root/airflow/dags/first_dag.py", line 34, in <module>
    dag=dag,
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 70, in __init__
    super(BashOperator, self).__init__(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 422, in __init__
    self.dag = dag
  File "/usr/local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 548, in dag
    dag.add_task(self)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1301, in add_task
    raise AirflowException("Task is missing the start_date parameter")
airflow.exceptions.AirflowException: Task is missing the start_date parameter

我想我也应该给我的操作员一个 start_date,但他们也应该使用他们的 DAG 中的日期。

那是因为你的两个任务还没有分配给default_args中包含start_date的DAG。

dummy_task = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

python_task = PythonOperator(task_id='python_task', python_callable=my_func, dag=dag)

请注意,您可以使用 DAG 对象作为上下文管理器,如 https://airflow.apache.org/docs/stable/concepts.html#context-manager 中所述,以避免对所有任务重复 dag=dag

示例:

with DAG(
    'basic_dag_2',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
) as dag:

    bashtask = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    dummy_task = DummyOperator(task_id='dummy_task', retries=3)

    python_task = PythonOperator(task_id='python_task', python_callable=my_func)

    dummy_task.set_downstream(bashtask)
    python_task.set_downstream(bashtask)

有同样的问题,您只需将 dag=dag 放入您使用的每个运算符中即可。 因为您的操作员仍然需要更多参数 运行 作为任务,并且这些参数在任务可以 运行.

之前在 DAG 部分中定义

一个例子: -这是错误的:

postgres_task_1 = PostgresOperator(
        task_id="get_param_2",
        postgres_conn_id="aramis_postgres_connection",
        sql="""
            SELECT  param_num_2 FROM public.aramis_meta_task
            """,
    )

-这是对的:

postgres_task_1 = PostgresOperator(
        dag=dag,
        task_id="get_param_2",
        postgres_conn_id="aramis_postgres_connection",
        sql="""
            SELECT  param_num_2 FROM public.aramis_meta_task
            """,
    )