气流保留相同的数据库连接?

Airflow retain the same database connection?

我将 Airflow 用于某些 ETL 事物,在某些阶段,我想使用临时表(主要是为了保持代码和数据对象独立并避免使用大量元数据表)。

使用 Airflow 中的 Postgres 连接和 "PostgresOperator" 我发现的行为是:对于 PostgresOperator 的每次执行,我们在数据库。换句话说:我们丢失了 DAG 先前组件的所有临时对象。

为了模拟一个简单的例子,我使用了这段代码(不要 运行,只看对象):

import os
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator

default_args = {
    'owner': 'airflow'
    ,'depends_on_past': False
    ,'start_date': datetime(2018, 6, 13)
    ,'retries': 3
    ,'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'refresh_views'
    , default_args=default_args)

# Create database workflow
drop_exist_temporary_view = "DROP TABLE IF EXISTS temporary_table_to_be_used;"

create_temporary_view = """
CREATE TEMPORARY TABLE temporary_table_to_be_used AS 
SELECT relname AS views
       ,CASE WHEN relispopulated = 'true' THEN 1 ELSE 0 END AS relispopulated
       ,CAST(reltuples AS INT) AS reltuples
FROM pg_class 
WHERE relname = 'some_view'
ORDER BY reltuples ASC;"""

use_temporary_view = """
DO $$
DECLARE
  is_correct integer := (SELECT relispopulated FROM temporary_table_to_be_used WHERE views LIKE '%<<some_name>>%');
BEGIN

start_time := clock_timestamp();
    IF is_materialized = 0 THEN
       EXECUTE 'REFRESH MATERIALIZED VIEW ' || view_to_refresh || ' WITH DATA;';
    ELSE 
       EXECUTE 'REFRESH MATERIALIZED VIEW CONCURRENTLY ' || view_to_refresh || ' WITH DATA;';
    END IF;

END;
$$ LANGUAGE plpgsql;
"""

# Objects to be executed
drop_exist_temporary_view = PostgresOperator(
    task_id='drop_exist_temporary_view',
    sql=drop_exist_temporary_view,
    postgres_conn_id='dwh_staging',
    dag=dag)

create_temporary_view = PostgresOperator(
    task_id='create_temporary_view',
    sql=create_temporary_view,
    postgres_conn_id='dwh_staging',
    dag=dag)

use_temporary_view = PostgresOperator(
    task_id='use_temporary_view',
    sql=use_temporary_view,
    postgres_conn_id='dwh_staging',
    dag=dag)

# Data workflow
drop_exist_temporary_view >> create_temporary_view >> use_temporary_view

执行结束时,我收到以下消息:

[2018-06-14 15:26:44,807] {base_task_runner.py:95} INFO - Subtask: psycopg2.ProgrammingError: relation "temporary_table_to_be_used" does not exist

有人知道 Airflow 是否有办法保持与数据库的相同连接?我认为它可以在数据库中的 creating/maintaining 个对象中节省大量工作。

您可以通过构建自定义运算符来保留与数据库的连接,该运算符利用 PostgresHook 在您执行一组 sql 操作时保留与数据库的连接。

您可以在 contrib on incubator-airflow or in Airflow-Plugins 中找到一些示例。

另一种选择是将此临时数据保存到 XCOMs。这将使您能够保留用于创建它的任务的元数据。这可能有助于解决问题。