如何在 S3KeySensorOperator Airflow 中提供 s3 存储桶目录的文件夹路径

How to give folder path of a s3 bucket directory in a S3KeySensorOperator Airflow

当前场景:这是我的 S3KeySensor 操作员。现在我已经厌倦了它并使用它并且它工作正常

 s3KeySensor = S3KeySensor(task_id='Check_file_in_s3_{}'.format(country),
                    bucket_key='*.csv',
                    wildcard_match=True,
                    bucket_name='s3sensorbucket,
                    aws_conn_id='my_s3_conn',
                    timeout=18 * 60 * 60,
                    poke_interval=10,
                    soft_fail=True,
                    default_args=default_args
                              )

错误:

 Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$" or be an ARN matching 
the regex "^arn:(aws).*:(s3|s3-object-lambda):[a-z\-0-9]+:[0-9]{12}:accesspoint[/:]. 
[a-zA-Z0-9\-]{1,63}$|^arn:(aws).*:s3-outposts:[a-z\-0-9]+:[0-9]{12}:outpost[/:][a-zA- 
Z0-9\-]{1,63}[/:]accesspoint[/:][a-zA-Z0-9\-]{1,63}

所需的输出:我希望我的输出看起来像这样,这样我就可以在存储桶中提供我的文件夹名称,这样 s3KeySensor 就可以查看该位置并继续选择是或否。我尝试了 Prefix 选项,但它说它没有函数名称前缀。 帮我解决一下?

s3KeySensor = S3KeySensor(task_id='Check_file_in_s3_{}'.format(country),
                    bucket_key='*.csv',
                    wildcard_match=True,
                    bucket_name='s3sensorbucket/glueProcessed_ke_{}'.format(country),
                    aws_conn_id='my_s3_conn',
                    timeout=18 * 60 * 60,
                    poke_interval=10,
                    soft_fail=True,
                    default_args=default_args
                              )

您可以使用 bucket_key 而不是 bucket_name。来自 docs:

bucket_key (str) -- The key being waited on. Supports full s3:// style url or relative path from root level. When it's specified as a full s3:// url, please leave bucket_name as None.

bucket_name (str) -- Name of the S3 bucket. Only needed when bucket_key is not provided as a full s3:// url.

s3KeySensor = S3KeySensor(
    task_id="Check_file_in_s3_{}".format(country),
    bucket_key=f"s3://s3tosensorbucketname/glueProcessed_ke_{country}/*.csv",
    wildcard_match=True,
    aws_conn_id="my_s3_conn",
    timeout=18 * 60 * 60,
    poke_interval=10,
    soft_fail=True,
)

或者,如果您想使用 bucket_name,请执行以下操作:

s3KeySensor = S3KeySensor(
    task_id="Check_file_in_s3_{}".format(country),
    bucket_key=f"glueProcessed_ke_{country}/*.csv",
    bucket_name="s3sensorbucketname",
    wildcard_match=True,
    aws_conn_id="my_s3_conn",
    timeout=18 * 60 * 60,
    poke_interval=10,
    soft_fail=True,
)