清理气流数据库。幽浮 Table
Cleanup Airflow Database. Xcom Table
在气流中清理 xcom table 的最佳方法是什么?那就是 运行 in docker with postgres db.
我尝试使用查询删除一些数据(从 xcom 删除)并尝试 运行 这个参考:https://cloud.google.com/composer/docs/cleanup-airflow-database 但它不起作用,xcom table 大小仍然不减少
与此案例相关,它在我的主机服务器上占用了大量存储空间
编辑:
我使用的版本:Airflow 1.10.3
对于每个问题,请提供详细信息(版本、堆栈跟踪等),说明您尝试了什么,什么没有奏效。说“它不起作用”并不能说明任何问题,而且很难提供帮助。请参阅 https://whosebug.com/help/how-to-ask 了解如何写出好的问题。
如果您可以直接访问 Metastore,则可以执行此查询:
DELETE FROM xcom;
XCom table 包含一个时间戳,您可以使用它来保存最近的 XComs。例如,此查询删除所有早于 14 天的 XComs:
DELETE FROM xcom WHERE current_date - "timestamp"::date > 14;
如果您不能直接访问 Metastore,您可以创建一个 DAG 来清理 Metastore 中的对象。无论您是 运行 按计划定期清理对象,还是不按计划 运行 何时清理数据库都取决于您自己:
import datetime
from airflow import DAG
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.utils.session import provide_session
with DAG(dag_id="cleanup_xcoms", schedule_interval=None, start_date=datetime.datetime(2022, 1, 1)) as dag:
@provide_session
def _delete_xcoms(session=None):
num_rows_deleted = 0
try:
num_rows_deleted = session.query(XCom).delete()
session.commit()
except:
session.rollback()
print(f"Deleted {num_rows_deleted} XCom rows")
delete_xcoms = PythonOperator(task_id="delete_xcoms", python_callable=_delete_xcoms)
XCom 对象有几个可以过滤的属性,例如 dag_id
:
session.query(XCom).filter(XCom.dag_id == "mydag123").delete()
DELETE FROM xcom WHERE current_date - "timestamp"::date > 14;
执行该查询后,然后 运行 VACUUM FULL;
来自主机
在气流中清理 xcom table 的最佳方法是什么?那就是 运行 in docker with postgres db.
我尝试使用查询删除一些数据(从 xcom 删除)并尝试 运行 这个参考:https://cloud.google.com/composer/docs/cleanup-airflow-database 但它不起作用,xcom table 大小仍然不减少
与此案例相关,它在我的主机服务器上占用了大量存储空间
编辑: 我使用的版本:Airflow 1.10.3
对于每个问题,请提供详细信息(版本、堆栈跟踪等),说明您尝试了什么,什么没有奏效。说“它不起作用”并不能说明任何问题,而且很难提供帮助。请参阅 https://whosebug.com/help/how-to-ask 了解如何写出好的问题。
如果您可以直接访问 Metastore,则可以执行此查询:
DELETE FROM xcom;
XCom table 包含一个时间戳,您可以使用它来保存最近的 XComs。例如,此查询删除所有早于 14 天的 XComs:
DELETE FROM xcom WHERE current_date - "timestamp"::date > 14;
如果您不能直接访问 Metastore,您可以创建一个 DAG 来清理 Metastore 中的对象。无论您是 运行 按计划定期清理对象,还是不按计划 运行 何时清理数据库都取决于您自己:
import datetime
from airflow import DAG
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.utils.session import provide_session
with DAG(dag_id="cleanup_xcoms", schedule_interval=None, start_date=datetime.datetime(2022, 1, 1)) as dag:
@provide_session
def _delete_xcoms(session=None):
num_rows_deleted = 0
try:
num_rows_deleted = session.query(XCom).delete()
session.commit()
except:
session.rollback()
print(f"Deleted {num_rows_deleted} XCom rows")
delete_xcoms = PythonOperator(task_id="delete_xcoms", python_callable=_delete_xcoms)
XCom 对象有几个可以过滤的属性,例如 dag_id
:
session.query(XCom).filter(XCom.dag_id == "mydag123").delete()
DELETE FROM xcom WHERE current_date - "timestamp"::date > 14;
执行该查询后,然后 运行 VACUUM FULL;
来自主机