如何根据气流时间戳添加数据库分区

How to add database partition based on airflow timestamp

我在这里尝试从开源中获取数据并将其添加到 table 作为基于气流时间戳的分区。但它引发了气流异常。

def partition_sql(entity_type):
            sql = """
                ALTER TABLE db.table
                ADD IF NOT EXISTS PARTITION (airflow_ts='{{ts}}')
                LOCATION 's3://db/table/update/airflow_ts={{ts}}';
            """
        return sql
    
with DAG(parameters)as dag:

    update = DockerOperator(
        task_id='update',
        cmd = 'python script.py 's3://db/table1/update/airflow_ts={{ts}}'
     )
  partition = AWSAthenaOperator(
        task_id='partition',
        query=partition_sql("artist"),
    )


 update >>partition

因为 cmdquery 字段都是模板化的,所以这应该有效:

items = ["artist"] #add more tables to be created dynamically
with DAG(
        dag_id="dag_name",
        default_args=default_args,
) as dag:
    for item in items:
        command = f'python script.py 's3://db/{item}/update/airflow_ts={{ ds }} '
        update = DockerOperator(
            task_id=f'update_table_{item}',
            cmd=command
        )
        sql = f'ALTER TABLE db.{item} ADD IF NOT EXISTS PARTITION (airflow_ts={{ ds }}) LOCATION s3://db/table/update/airflow_ts={{ ds }};'
        partition = AWSAthenaOperator(
            task_id=f'partition_table_{item}',
            query=sql
        )

您可以从 {{ ds }} 更改为您喜欢的任何其他日期格式。您可以在 macros 页面上查看可用格式或自定义一种您自己的格式。

请注意,您不必在此处的代码中保存 SQL。您可以按照

的说明将其保存在 .sql 个文件中