运行 带有语句超时气流 dag 的 postgres 运算符
Run postgres operator with statement timeout airflow dag
所以我一直在尝试 运行 在我的 DAG 上使用 2 个 postgres 运算符,看起来像这样:
default_args = {
'owner': 'local',
}
log = logging.getLogger(_name_)
TEMP_SETTLEMENT ="""
set statement_timeout to 0;
select function_a();
"""
VACUM_SETTLEMENT="""
vacuum (verbose, analyze) test_table;
"""
try:
with DAG(
dag_id='temp-test',
default_args=default_args,
schedule_interval=none,
start_date=datetime(2021, 10, 1),
max_active_runs=1,
catchup=False
) as dag:
pg = PostgresOperator(
task_id="data",
postgres_conn_id="connection_1",
database="DB_test",
autocommit=True,
sql=TEMP_SETTLEMENT,
)
vacum = PostgresOperator(
task_id="vacum",
postgres_conn_id="connection_1",
database="DB_test",
autocommit=True,
sql=VACUM_SETTLEMENT, )
pg >> vacum
except ImportError as e:
log.warning("Could not import DAGs: %s", str(e))
当我尝试 运行 temp_settlement 时,我总是遇到语句超时,有什么办法可以保持 statement_timeout=0?
谢谢
更新:
开始 apache-airflow-providers-postgres>=4.1.0
你可以这样做:
PostgresOperator(
...,
runtime_parameters={'statement_timeout': '3000ms'},
)
原答案:
你没有从你的描述中提到它我假设超时来自 Postgres 而不是来自 Airflow。
目前 PostgresOperator
不允许覆盖 hook/connection 设置。
要解决您的问题,您需要按照 docs 中的说明在额外字段中编辑 connection_1
您需要添加 statement_timeout
:
{'statement_timeout': '3600s'}
我打开 https://github.com/apache/airflow/issues/21486 作为后续功能请求,以允许直接从操作员那里设置 statement_timeout
。
所以我一直在尝试 运行 在我的 DAG 上使用 2 个 postgres 运算符,看起来像这样:
default_args = {
'owner': 'local',
}
log = logging.getLogger(_name_)
TEMP_SETTLEMENT ="""
set statement_timeout to 0;
select function_a();
"""
VACUM_SETTLEMENT="""
vacuum (verbose, analyze) test_table;
"""
try:
with DAG(
dag_id='temp-test',
default_args=default_args,
schedule_interval=none,
start_date=datetime(2021, 10, 1),
max_active_runs=1,
catchup=False
) as dag:
pg = PostgresOperator(
task_id="data",
postgres_conn_id="connection_1",
database="DB_test",
autocommit=True,
sql=TEMP_SETTLEMENT,
)
vacum = PostgresOperator(
task_id="vacum",
postgres_conn_id="connection_1",
database="DB_test",
autocommit=True,
sql=VACUM_SETTLEMENT, )
pg >> vacum
except ImportError as e:
log.warning("Could not import DAGs: %s", str(e))
当我尝试 运行 temp_settlement 时,我总是遇到语句超时,有什么办法可以保持 statement_timeout=0?
谢谢
更新:
开始 apache-airflow-providers-postgres>=4.1.0
你可以这样做:
PostgresOperator(
...,
runtime_parameters={'statement_timeout': '3000ms'},
)
原答案:
你没有从你的描述中提到它我假设超时来自 Postgres 而不是来自 Airflow。
目前 PostgresOperator
不允许覆盖 hook/connection 设置。
要解决您的问题,您需要按照 docs 中的说明在额外字段中编辑 connection_1
您需要添加 statement_timeout
:
{'statement_timeout': '3600s'}
我打开 https://github.com/apache/airflow/issues/21486 作为后续功能请求,以允许直接从操作员那里设置 statement_timeout
。