气流保留相同的数据库连接?
Airflow retain the same database connection?
我将 Airflow 用于某些 ETL 事物,在某些阶段,我想使用临时表(主要是为了保持代码和数据对象独立并避免使用大量元数据表)。
使用 Airflow 中的 Postgres 连接和 "PostgresOperator" 我发现的行为是:对于 PostgresOperator 的每次执行,我们在数据库。换句话说:我们丢失了 DAG 先前组件的所有临时对象。
为了模拟一个简单的例子,我使用了这段代码(不要 运行,只看对象):
import os
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
default_args = {
'owner': 'airflow'
,'depends_on_past': False
,'start_date': datetime(2018, 6, 13)
,'retries': 3
,'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'refresh_views'
, default_args=default_args)
# Create database workflow
drop_exist_temporary_view = "DROP TABLE IF EXISTS temporary_table_to_be_used;"
create_temporary_view = """
CREATE TEMPORARY TABLE temporary_table_to_be_used AS
SELECT relname AS views
,CASE WHEN relispopulated = 'true' THEN 1 ELSE 0 END AS relispopulated
,CAST(reltuples AS INT) AS reltuples
FROM pg_class
WHERE relname = 'some_view'
ORDER BY reltuples ASC;"""
use_temporary_view = """
DO $$
DECLARE
is_correct integer := (SELECT relispopulated FROM temporary_table_to_be_used WHERE views LIKE '%<<some_name>>%');
BEGIN
start_time := clock_timestamp();
IF is_materialized = 0 THEN
EXECUTE 'REFRESH MATERIALIZED VIEW ' || view_to_refresh || ' WITH DATA;';
ELSE
EXECUTE 'REFRESH MATERIALIZED VIEW CONCURRENTLY ' || view_to_refresh || ' WITH DATA;';
END IF;
END;
$$ LANGUAGE plpgsql;
"""
# Objects to be executed
drop_exist_temporary_view = PostgresOperator(
task_id='drop_exist_temporary_view',
sql=drop_exist_temporary_view,
postgres_conn_id='dwh_staging',
dag=dag)
create_temporary_view = PostgresOperator(
task_id='create_temporary_view',
sql=create_temporary_view,
postgres_conn_id='dwh_staging',
dag=dag)
use_temporary_view = PostgresOperator(
task_id='use_temporary_view',
sql=use_temporary_view,
postgres_conn_id='dwh_staging',
dag=dag)
# Data workflow
drop_exist_temporary_view >> create_temporary_view >> use_temporary_view
执行结束时,我收到以下消息:
[2018-06-14 15:26:44,807] {base_task_runner.py:95} INFO - Subtask: psycopg2.ProgrammingError: relation "temporary_table_to_be_used" does not exist
有人知道 Airflow 是否有办法保持与数据库的相同连接?我认为它可以在数据库中的 creating/maintaining 个对象中节省大量工作。
您可以通过构建自定义运算符来保留与数据库的连接,该运算符利用 PostgresHook 在您执行一组 sql 操作时保留与数据库的连接。
您可以在 contrib on incubator-airflow or in Airflow-Plugins 中找到一些示例。
另一种选择是将此临时数据保存到 XCOMs。这将使您能够保留用于创建它的任务的元数据。这可能有助于解决问题。
我将 Airflow 用于某些 ETL 事物,在某些阶段,我想使用临时表(主要是为了保持代码和数据对象独立并避免使用大量元数据表)。
使用 Airflow 中的 Postgres 连接和 "PostgresOperator" 我发现的行为是:对于 PostgresOperator 的每次执行,我们在数据库。换句话说:我们丢失了 DAG 先前组件的所有临时对象。
为了模拟一个简单的例子,我使用了这段代码(不要 运行,只看对象):
import os
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
default_args = {
'owner': 'airflow'
,'depends_on_past': False
,'start_date': datetime(2018, 6, 13)
,'retries': 3
,'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'refresh_views'
, default_args=default_args)
# Create database workflow
drop_exist_temporary_view = "DROP TABLE IF EXISTS temporary_table_to_be_used;"
create_temporary_view = """
CREATE TEMPORARY TABLE temporary_table_to_be_used AS
SELECT relname AS views
,CASE WHEN relispopulated = 'true' THEN 1 ELSE 0 END AS relispopulated
,CAST(reltuples AS INT) AS reltuples
FROM pg_class
WHERE relname = 'some_view'
ORDER BY reltuples ASC;"""
use_temporary_view = """
DO $$
DECLARE
is_correct integer := (SELECT relispopulated FROM temporary_table_to_be_used WHERE views LIKE '%<<some_name>>%');
BEGIN
start_time := clock_timestamp();
IF is_materialized = 0 THEN
EXECUTE 'REFRESH MATERIALIZED VIEW ' || view_to_refresh || ' WITH DATA;';
ELSE
EXECUTE 'REFRESH MATERIALIZED VIEW CONCURRENTLY ' || view_to_refresh || ' WITH DATA;';
END IF;
END;
$$ LANGUAGE plpgsql;
"""
# Objects to be executed
drop_exist_temporary_view = PostgresOperator(
task_id='drop_exist_temporary_view',
sql=drop_exist_temporary_view,
postgres_conn_id='dwh_staging',
dag=dag)
create_temporary_view = PostgresOperator(
task_id='create_temporary_view',
sql=create_temporary_view,
postgres_conn_id='dwh_staging',
dag=dag)
use_temporary_view = PostgresOperator(
task_id='use_temporary_view',
sql=use_temporary_view,
postgres_conn_id='dwh_staging',
dag=dag)
# Data workflow
drop_exist_temporary_view >> create_temporary_view >> use_temporary_view
执行结束时,我收到以下消息:
[2018-06-14 15:26:44,807] {base_task_runner.py:95} INFO - Subtask: psycopg2.ProgrammingError: relation "temporary_table_to_be_used" does not exist
有人知道 Airflow 是否有办法保持与数据库的相同连接?我认为它可以在数据库中的 creating/maintaining 个对象中节省大量工作。
您可以通过构建自定义运算符来保留与数据库的连接,该运算符利用 PostgresHook 在您执行一组 sql 操作时保留与数据库的连接。
您可以在 contrib on incubator-airflow or in Airflow-Plugins 中找到一些示例。
另一种选择是将此临时数据保存到 XCOMs。这将使您能够保留用于创建它的任务的元数据。这可能有助于解决问题。