运行 中的气流 "This connection is closed" 但未在测试中

Airflow "This connection is closed" in run but not in test

我正在使用 airflow 1.7.1.3 和 python 2.7

我创建了一个 DAG,当我 运行 每个任务分别使用

airflow test [myDAG] [myTask] 2016-10-14

然而,

airflow trigger_dag [myDAG]

airflow run [myDAG] [myTask] 2016-10-14

两者都引发 "This connection is closed" SQLalchemy 错误。

[...]
    with self.engine.connect() as connection:
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2016, in connect
    return self._connection_cls(self, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 72, in __init__
    if connection is not None else engine.raw_connection()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2102, in raw_connection
    self.pool.unique_connection, _connection)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2072, in _wrap_pool_connect
    return fn()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 318, in unique_connection
    return _ConnectionFairy._checkout(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 746, in _checkout
    raise exc.InvalidRequestError("This connection is closed")
InvalidRequestError: This connection is closed
[2016-10-14 15:49:30,704] {models.py:1306} INFO - Marking task as FAILED.
[2016-10-14 15:49:30,712] {models.py:1327} ERROR - This connection is closed

这是通过 SQLalchemy 与 Oracle 12 数据库的连接,当我在脚本中使用 session.commit() 时出现此错误。

有人知道什么可以解释这种差异和错误吗?

here 所述,这是一个已知错误。在修复此错误之前,您将无法通过 SQLAlchemy 连接到 Oracle。

问题是由于源代码中的某些 SQL 语法引起的。你不能在 Oracle 中说 "SELECT 1",你需要说 "SELECT 1 FROM DUAL".

也许考虑使用 Airflow 中的一个挂钩:https://github.com/apache/incubator-airflow/tree/master/airflow/hooks

似乎 oracle_hook 可以帮助您。祝你好运。