Airflow - 在调用 Operator 时获取任务中的 execution_date

Airflow - getting the execution_date in task when calling an Operator

我有这个运算符,它与 S3CopyObjectOperator 几乎相同,除了它会查找文件夹中的所有对象并将其复制到目标文件夹。

import os
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults
from common.s3.partition import Partition, PartitionType
from airflow.models import BaseOperator
import logging

class S3CopyObjectsOperator(BaseOperator):
    @apply_defaults
    def __init__(self,
                 aws_conn_id: str,
                 partition: Partition,
                 s3_bucket: str,
                 dest_prefix: str,
                 *args,
                 **kwargs):
        super(S3CopyObjectsOperator, self).__init__(*args, **kwargs)
        self.aws_conn_id = aws_conn_id
        self.partition = partition
        self.s3_bucket = s3_bucket
        self.dest_prefix = dest_prefix

    def execute(self, context):
        self.partition.partition_value = context.get("execution_date")
        logging.info(f'self.dest_prefix: {self.dest_prefix}')
        exec_date = context.get("execution_date")

        logging.info(f'self.partition.partition_value: {self.partition.partition_value}')

        s3 = S3Hook(self.aws_conn_id)
        s3_conn = s3.get_conn()

        logging.info(f'source bucket -- self.partition.bucket: {self.partition.bucket}')
        logging.info(f'source key -- self.partition.key_prefix: {self.partition.key_prefix}')

        source_keys = s3.list_keys(bucket_name=self.partition.bucket, prefix=self.partition.key_prefix, delimiter="/")

        logging.info(f'keys: {source_keys}')

        for file in source_keys:

            prefix, filename = os.path.split(file)

            dest_key = f'{self.dest_prefix}/{filename}'

            logging.info(f'Copying file {filename} to {self.dest_prefix}')

            key = self.partition.key_prefix + filename
            logging.info(f'key: {key}')

            s3_conn.copy_object(Bucket=self.s3_bucket,
                                Key=f'{dest_key}',
                                CopySource={
                                    'Bucket': self.partition.bucket,
                                    'Key': key
                                    }, ContentEncoding='csv')

然而,当我在我的任务中使用这个运算符时,我需要我的 dest_prefix 来包含执行日期。

我尝试过的事情: 我尝试在 dag 文件中添加 ds = '{{ ds_nodash }}',但是当我在 Operator 中打印 self.dest_prefix 时,它的值是 returns 他的字符串值而不是执行日期。 我也尝试创建一个函数,但是当我在 Operator 中打印 self.dest_prefix 时,它的值 returns 是:self.dest_prefix: <function exec_value at 0x7fd008fcb940> 请参阅下面的任务:

执行日期应在snapshot_date=

之后
for data_group in data_group_names:
    copy_felix_to_s3 = S3CopyObjectsOperator(
        task_id=f'copy_felix_{data_group}_data_to_s3',
        aws_conn_id='aws_default',
        s3_bucket='bucket_name',
        partition=felixS3Partition(
            bucket='source_bucket',
            location_base=f'our_bucket/{data_group}',
            partition_type=None
        ),
        dest_prefix=f"felix/{data_group}/snapshot_date= ds",
        dag=dag
    )

    copy_felix_to_s3

您缺少将参数声明为模板化字段。

class S3CopyObjectsOperator(BaseOperator):
    ...
    template_fields = ("dest_prefix",)
    ...

宏(例如 ds_nodash)仅适用于模板化字段,因此如果您不指定 template_fields,它将处理您作为字符串传递的值,并且不会呈现。