清理气流数据库。幽浮 Table

Cleanup Airflow Database. Xcom Table

在气流中清理 xcom table 的最佳方法是什么?那就是 运行 in docker with postgres db.

我尝试使用查询删除一些数据(从 xcom 删除)并尝试 运行 这个参考:https://cloud.google.com/composer/docs/cleanup-airflow-database 但它不起作用,xcom table 大小仍然不减少

与此案例相关,它在我的主机服务器上占用了大量存储空间

编辑: 我使用的版本:Airflow 1.10.3

对于每个问题,请提供详细信息(版本、堆栈跟踪等),说明您尝试了什么,什么没有奏效。说“它不起作用”并不能说明任何问题,而且很难提供帮助。请参阅 https://whosebug.com/help/how-to-ask 了解如何写出好的问题。

如果您可以直接访问 Metastore,则可以执行此查询:

DELETE FROM xcom;

XCom table 包含一个时间戳,您可以使用它来保存最近的 XComs。例如,此查询删除所有早于 14 天的 XComs:

DELETE FROM xcom WHERE current_date - "timestamp"::date > 14;

如果您不能直接访问 Metastore,您可以创建一个 DAG 来清理 Metastore 中的对象。无论您是 运行 按计划定期清理对象,还是不按计划 运行 何时清理数据库都取决于您自己:

import datetime

from airflow import DAG
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.utils.session import provide_session

with DAG(dag_id="cleanup_xcoms", schedule_interval=None, start_date=datetime.datetime(2022, 1, 1)) as dag:

    @provide_session
    def _delete_xcoms(session=None):
        num_rows_deleted = 0

        try:
            num_rows_deleted = session.query(XCom).delete()
            session.commit()
        except:
            session.rollback()

        print(f"Deleted {num_rows_deleted} XCom rows")

    delete_xcoms = PythonOperator(task_id="delete_xcoms", python_callable=_delete_xcoms)

XCom 对象有几个可以过滤的属性,例如 dag_id:

session.query(XCom).filter(XCom.dag_id == "mydag123").delete()
DELETE FROM xcom WHERE current_date - "timestamp"::date > 14;

执行该查询后,然后 运行 VACUUM FULL; 来自主机