Airflow - 尝试 运行 TaskInstance 时从 sqlalchemy 引发错误
Airflow - Error raised from sqlalchemy when try to run a TaskInstance
我正在测试气流结构内部爬虫的执行情况。如果我 运行 以下脚本,一切正常并且有效负载被打印出来。
from airflow import DAG
from airflow.models import BaseOperator, TaskInstance
from hooks.crawler_hook import CrawlerHook
from datetime import datetime
import time
class CrawlerOperator(BaseOperator):
def __init__(self, conn_id=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
def execute(self):
hook = CrawlerHook(conn_id=self.conn_id)
print(hook.run())
if __name__ == "__main__":
CrawlerOperator(task_id='test_run').execute()
但是当我尝试 运行 DAG 中的 TaskInstance 时,出现错误并且无法理解原因:
if __name__ == "__main__":
with DAG(dag_id="DAG1", start_date=datetime.now(), catchup=False) as dag:
to = CrawlerOperator(task_id="test_run")
ti = TaskInstance(task=to)
ti.run()
错误:
Traceback (most recent call last):
File "/home/../.env/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/../.env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1123, in get_dagrun
dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
File "/home/../.env/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3500, in one
raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()
有什么建议吗?
我假设您正在某种单元测试中使用它。您缺少的(如错误所示)是 DagRun:
from airflow.models import DagRun
DagRun(dag_id=self.dag.dag_id, execution_date=timezone.utcnow(), run_id="test")
ti.dag_run = dag_run
这是必需的,因为任务关联到 DagRun 而不是 DAG。
DAG 可以有很多 DagRun。
您可以在 Airflow codebase 的单元测试之一中查看示例。
我正在测试气流结构内部爬虫的执行情况。如果我 运行 以下脚本,一切正常并且有效负载被打印出来。
from airflow import DAG
from airflow.models import BaseOperator, TaskInstance
from hooks.crawler_hook import CrawlerHook
from datetime import datetime
import time
class CrawlerOperator(BaseOperator):
def __init__(self, conn_id=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
def execute(self):
hook = CrawlerHook(conn_id=self.conn_id)
print(hook.run())
if __name__ == "__main__":
CrawlerOperator(task_id='test_run').execute()
但是当我尝试 运行 DAG 中的 TaskInstance 时,出现错误并且无法理解原因:
if __name__ == "__main__":
with DAG(dag_id="DAG1", start_date=datetime.now(), catchup=False) as dag:
to = CrawlerOperator(task_id="test_run")
ti = TaskInstance(task=to)
ti.run()
错误:
Traceback (most recent call last):
File "/home/../.env/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/../.env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1123, in get_dagrun
dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
File "/home/../.env/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3500, in one
raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()
有什么建议吗?
我假设您正在某种单元测试中使用它。您缺少的(如错误所示)是 DagRun:
from airflow.models import DagRun
DagRun(dag_id=self.dag.dag_id, execution_date=timezone.utcnow(), run_id="test")
ti.dag_run = dag_run
这是必需的,因为任务关联到 DagRun 而不是 DAG。 DAG 可以有很多 DagRun。
您可以在 Airflow codebase 的单元测试之一中查看示例。