Airflow - 尝试 运行 TaskInstance 时从 sqlalchemy 引发错误

Airflow - Error raised from sqlalchemy when try to run a TaskInstance

我正在测试气流结构内部爬虫的执行情况。如果我 运行 以下脚本,一切正常并且有效负载被打印出来。

from airflow import DAG
from airflow.models import BaseOperator, TaskInstance
from hooks.crawler_hook import CrawlerHook

from datetime import datetime
import time

class CrawlerOperator(BaseOperator):
    def __init__(self, conn_id=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id

    def execute(self):
        hook = CrawlerHook(conn_id=self.conn_id)
        print(hook.run())

if __name__ == "__main__":
    CrawlerOperator(task_id='test_run').execute()

但是当我尝试 运行 DAG 中的 TaskInstance 时,出现错误并且无法理解原因:

if __name__ == "__main__":
    with DAG(dag_id="DAG1", start_date=datetime.now(), catchup=False) as dag:
        to = CrawlerOperator(task_id="test_run")
        ti = TaskInstance(task=to)
        ti.run()

错误:

Traceback (most recent call last):
  File "/home/../.env/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/home/../.env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1123, in get_dagrun
    dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
  File "/home/../.env/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3500, in one
    raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()

有什么建议吗?

我假设您正在某种单元测试中使用它。您缺少的(如错误所示)是 DagRun:

from airflow.models import DagRun
DagRun(dag_id=self.dag.dag_id, execution_date=timezone.utcnow(), run_id="test")
ti.dag_run = dag_run

这是必需的,因为任务关联到 DagRun 而不是 DAG。 DAG 可以有很多 DagRun。

您可以在 Airflow codebase 的单元测试之一中查看示例。