Airflow - 在调用 Operator 时获取任务中的 execution_date
Airflow - getting the execution_date in task when calling an Operator
我有这个运算符,它与 S3CopyObjectOperator 几乎相同,除了它会查找文件夹中的所有对象并将其复制到目标文件夹。
import os
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults
from common.s3.partition import Partition, PartitionType
from airflow.models import BaseOperator
import logging
class S3CopyObjectsOperator(BaseOperator):
@apply_defaults
def __init__(self,
aws_conn_id: str,
partition: Partition,
s3_bucket: str,
dest_prefix: str,
*args,
**kwargs):
super(S3CopyObjectsOperator, self).__init__(*args, **kwargs)
self.aws_conn_id = aws_conn_id
self.partition = partition
self.s3_bucket = s3_bucket
self.dest_prefix = dest_prefix
def execute(self, context):
self.partition.partition_value = context.get("execution_date")
logging.info(f'self.dest_prefix: {self.dest_prefix}')
exec_date = context.get("execution_date")
logging.info(f'self.partition.partition_value: {self.partition.partition_value}')
s3 = S3Hook(self.aws_conn_id)
s3_conn = s3.get_conn()
logging.info(f'source bucket -- self.partition.bucket: {self.partition.bucket}')
logging.info(f'source key -- self.partition.key_prefix: {self.partition.key_prefix}')
source_keys = s3.list_keys(bucket_name=self.partition.bucket, prefix=self.partition.key_prefix, delimiter="/")
logging.info(f'keys: {source_keys}')
for file in source_keys:
prefix, filename = os.path.split(file)
dest_key = f'{self.dest_prefix}/{filename}'
logging.info(f'Copying file {filename} to {self.dest_prefix}')
key = self.partition.key_prefix + filename
logging.info(f'key: {key}')
s3_conn.copy_object(Bucket=self.s3_bucket,
Key=f'{dest_key}',
CopySource={
'Bucket': self.partition.bucket,
'Key': key
}, ContentEncoding='csv')
然而,当我在我的任务中使用这个运算符时,我需要我的 dest_prefix
来包含执行日期。
我尝试过的事情:
我尝试在 dag 文件中添加 ds = '{{ ds_nodash }}'
,但是当我在 Operator 中打印 self.dest_prefix 时,它的值是 returns 他的字符串值而不是执行日期。
我也尝试创建一个函数,但是当我在 Operator 中打印 self.dest_prefix 时,它的值 returns 是:self.dest_prefix: <function exec_value at 0x7fd008fcb940>
请参阅下面的任务:
执行日期应在snapshot_date=
之后
for data_group in data_group_names:
copy_felix_to_s3 = S3CopyObjectsOperator(
task_id=f'copy_felix_{data_group}_data_to_s3',
aws_conn_id='aws_default',
s3_bucket='bucket_name',
partition=felixS3Partition(
bucket='source_bucket',
location_base=f'our_bucket/{data_group}',
partition_type=None
),
dest_prefix=f"felix/{data_group}/snapshot_date= ds",
dag=dag
)
copy_felix_to_s3
您缺少将参数声明为模板化字段。
class S3CopyObjectsOperator(BaseOperator):
...
template_fields = ("dest_prefix",)
...
宏(例如 ds_nodash
)仅适用于模板化字段,因此如果您不指定 template_fields
,它将处理您作为字符串传递的值,并且不会呈现。
我有这个运算符,它与 S3CopyObjectOperator 几乎相同,除了它会查找文件夹中的所有对象并将其复制到目标文件夹。
import os
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults
from common.s3.partition import Partition, PartitionType
from airflow.models import BaseOperator
import logging
class S3CopyObjectsOperator(BaseOperator):
@apply_defaults
def __init__(self,
aws_conn_id: str,
partition: Partition,
s3_bucket: str,
dest_prefix: str,
*args,
**kwargs):
super(S3CopyObjectsOperator, self).__init__(*args, **kwargs)
self.aws_conn_id = aws_conn_id
self.partition = partition
self.s3_bucket = s3_bucket
self.dest_prefix = dest_prefix
def execute(self, context):
self.partition.partition_value = context.get("execution_date")
logging.info(f'self.dest_prefix: {self.dest_prefix}')
exec_date = context.get("execution_date")
logging.info(f'self.partition.partition_value: {self.partition.partition_value}')
s3 = S3Hook(self.aws_conn_id)
s3_conn = s3.get_conn()
logging.info(f'source bucket -- self.partition.bucket: {self.partition.bucket}')
logging.info(f'source key -- self.partition.key_prefix: {self.partition.key_prefix}')
source_keys = s3.list_keys(bucket_name=self.partition.bucket, prefix=self.partition.key_prefix, delimiter="/")
logging.info(f'keys: {source_keys}')
for file in source_keys:
prefix, filename = os.path.split(file)
dest_key = f'{self.dest_prefix}/{filename}'
logging.info(f'Copying file {filename} to {self.dest_prefix}')
key = self.partition.key_prefix + filename
logging.info(f'key: {key}')
s3_conn.copy_object(Bucket=self.s3_bucket,
Key=f'{dest_key}',
CopySource={
'Bucket': self.partition.bucket,
'Key': key
}, ContentEncoding='csv')
然而,当我在我的任务中使用这个运算符时,我需要我的 dest_prefix
来包含执行日期。
我尝试过的事情:
我尝试在 dag 文件中添加 ds = '{{ ds_nodash }}'
,但是当我在 Operator 中打印 self.dest_prefix 时,它的值是 returns 他的字符串值而不是执行日期。
我也尝试创建一个函数,但是当我在 Operator 中打印 self.dest_prefix 时,它的值 returns 是:self.dest_prefix: <function exec_value at 0x7fd008fcb940>
请参阅下面的任务:
执行日期应在snapshot_date=
for data_group in data_group_names:
copy_felix_to_s3 = S3CopyObjectsOperator(
task_id=f'copy_felix_{data_group}_data_to_s3',
aws_conn_id='aws_default',
s3_bucket='bucket_name',
partition=felixS3Partition(
bucket='source_bucket',
location_base=f'our_bucket/{data_group}',
partition_type=None
),
dest_prefix=f"felix/{data_group}/snapshot_date= ds",
dag=dag
)
copy_felix_to_s3
您缺少将参数声明为模板化字段。
class S3CopyObjectsOperator(BaseOperator):
...
template_fields = ("dest_prefix",)
...
宏(例如 ds_nodash
)仅适用于模板化字段,因此如果您不指定 template_fields
,它将处理您作为字符串传递的值,并且不会呈现。