如何根据气流时间戳添加数据库分区
How to add database partition based on airflow timestamp
我在这里尝试从开源中获取数据并将其添加到 table 作为基于气流时间戳的分区。但它引发了气流异常。
def partition_sql(entity_type):
sql = """
ALTER TABLE db.table
ADD IF NOT EXISTS PARTITION (airflow_ts='{{ts}}')
LOCATION 's3://db/table/update/airflow_ts={{ts}}';
"""
return sql
with DAG(parameters)as dag:
update = DockerOperator(
task_id='update',
cmd = 'python script.py 's3://db/table1/update/airflow_ts={{ts}}'
)
partition = AWSAthenaOperator(
task_id='partition',
query=partition_sql("artist"),
)
update >>partition
因为 cmd
和 query
字段都是模板化的,所以这应该有效:
items = ["artist"] #add more tables to be created dynamically
with DAG(
dag_id="dag_name",
default_args=default_args,
) as dag:
for item in items:
command = f'python script.py 's3://db/{item}/update/airflow_ts={{ ds }} '
update = DockerOperator(
task_id=f'update_table_{item}',
cmd=command
)
sql = f'ALTER TABLE db.{item} ADD IF NOT EXISTS PARTITION (airflow_ts={{ ds }}) LOCATION s3://db/table/update/airflow_ts={{ ds }};'
partition = AWSAthenaOperator(
task_id=f'partition_table_{item}',
query=sql
)
您可以从 {{ ds }}
更改为您喜欢的任何其他日期格式。您可以在 macros 页面上查看可用格式或自定义一种您自己的格式。
请注意,您不必在此处的代码中保存 SQL。您可以按照
的说明将其保存在 .sql
个文件中
我在这里尝试从开源中获取数据并将其添加到 table 作为基于气流时间戳的分区。但它引发了气流异常。
def partition_sql(entity_type):
sql = """
ALTER TABLE db.table
ADD IF NOT EXISTS PARTITION (airflow_ts='{{ts}}')
LOCATION 's3://db/table/update/airflow_ts={{ts}}';
"""
return sql
with DAG(parameters)as dag:
update = DockerOperator(
task_id='update',
cmd = 'python script.py 's3://db/table1/update/airflow_ts={{ts}}'
)
partition = AWSAthenaOperator(
task_id='partition',
query=partition_sql("artist"),
)
update >>partition
因为 cmd
和 query
字段都是模板化的,所以这应该有效:
items = ["artist"] #add more tables to be created dynamically
with DAG(
dag_id="dag_name",
default_args=default_args,
) as dag:
for item in items:
command = f'python script.py 's3://db/{item}/update/airflow_ts={{ ds }} '
update = DockerOperator(
task_id=f'update_table_{item}',
cmd=command
)
sql = f'ALTER TABLE db.{item} ADD IF NOT EXISTS PARTITION (airflow_ts={{ ds }}) LOCATION s3://db/table/update/airflow_ts={{ ds }};'
partition = AWSAthenaOperator(
task_id=f'partition_table_{item}',
query=sql
)
您可以从 {{ ds }}
更改为您喜欢的任何其他日期格式。您可以在 macros 页面上查看可用格式或自定义一种您自己的格式。
请注意,您不必在此处的代码中保存 SQL。您可以按照
.sql
个文件中