Airflow "Something Bad Has Happened" Error: Session Table does not exist

Airflow "Something Bad Has Happened" Error: Session Table does not exist

我正在尝试创建一个简单的测试 dag 以在堡垒主机后面的 AWS EC2 postgres 实例中编写测试查询。

在带有 touch pg_test.pynano pg_test.py

的气流中添加此脚本后
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime

Query = """DROP TABLE IF EXISTS dataset.test;
create TABLE dataset.test  as (select * from table
);"""

dag = DAG(
    'postgres_test_dag',
    schedule_interval = '0 * * * *',
    start_date = datetime(2021, 3, 20), catchup = False
)

create_table = PostgresOperator(
    task_id='create_table',
    dag=dag,
    postgres_conn_id='postgres_dwh',
    sql=Query
)

create_table

并使用

安装 airflow postgres 提供程序
`pip install apache-airflow-providers-postgres`

我遇到这个错误,但我不知道如何解决

Something bad has happened.

Airflow is used by many users, and it is very likely that others had similar problems and you can easily find
a solution to your problem.

Consider following these steps:

  * gather the relevant information (detailed logs with errors, reproduction steps, details of your deployment)

  * find similar issues using:
     * GitHub Discussions
     * GitHub Issues
     * Stack Overflow
     * the usual search engine you use on a daily basis

  * if you run Airflow on a Managed Service, consider opening an issue using the service support channels

  * if you tried and have difficulty with diagnosing and fixing the problem yourself, consider creating a bug report.
    Make sure however, to include all relevant details and results of your investigation so far.

Python version: 3.8.10
Airflow version: 2.2.4
Node: ip-XXXXXXXXX.ec2.internal
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.UndefinedTable: relation "session" does not exist
LINE 2: FROM session 
             ^


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1953, in full_dispatch_request
    return self.finalize_request(rv)
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1970, in finalize_request
    response = self.process_response(response)
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 2269, in process_response
    self.session_interface.save_session(self, ctx.session, response)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/session.py", line 32, in save_session
    return super().save_session(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/flask_session/sessions.py", line 553, in save_session
    saved_session = self.sql_session_model.query.filter_by(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3429, in first
    ret = list(self[0:1])
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__
    return list(res)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
    return self._execute_and_instances(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "session" does not exist
LINE 2: FROM session 
             ^

[SQL: SELECT session.id AS session_id_1, session.session_id AS session_session_id, session.data AS session_data, session.expiry AS session_expiry 
FROM session 
WHERE session.session_id = %(session_id_1)s 
 LIMIT %(param_1)s]
[parameters: {'session_id_1': '3c0eb88d-9042-4951-94a1-c6d127d02450', 'param_1': 1}]
(Background on this error at: http://sqlalche.me/e/13/f405)

我已经尝试重启气流实例并重新连接到终端但没有成功。我怀疑它与 postgres (AWS EC2) 和内部气流 postgres 数据库的连接有关。

你有什么建议吗?也许还有这个问题的精确解释?将不胜感激。

问题是 Airflow 正在元数据数据库中寻找会话 table,但没有找到。我在 migration 中看到这个 table 是最近添加的,您可以 运行 airflow db upgrade 到 运行 最新的迁移,这应该可以解决您的问题。