如何在 S3KeySensorOperator Airflow 中提供 s3 存储桶目录的文件夹路径
How to give folder path of a s3 bucket directory in a S3KeySensorOperator Airflow
当前场景:这是我的 S3KeySensor 操作员。现在我已经厌倦了它并使用它并且它工作正常
s3KeySensor = S3KeySensor(task_id='Check_file_in_s3_{}'.format(country),
bucket_key='*.csv',
wildcard_match=True,
bucket_name='s3sensorbucket,
aws_conn_id='my_s3_conn',
timeout=18 * 60 * 60,
poke_interval=10,
soft_fail=True,
default_args=default_args
)
错误:
Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$" or be an ARN matching
the regex "^arn:(aws).*:(s3|s3-object-lambda):[a-z\-0-9]+:[0-9]{12}:accesspoint[/:].
[a-zA-Z0-9\-]{1,63}$|^arn:(aws).*:s3-outposts:[a-z\-0-9]+:[0-9]{12}:outpost[/:][a-zA-
Z0-9\-]{1,63}[/:]accesspoint[/:][a-zA-Z0-9\-]{1,63}
所需的输出:我希望我的输出看起来像这样,这样我就可以在存储桶中提供我的文件夹名称,这样 s3KeySensor 就可以查看该位置并继续选择是或否。我尝试了 Prefix 选项,但它说它没有函数名称前缀。
帮我解决一下?
s3KeySensor = S3KeySensor(task_id='Check_file_in_s3_{}'.format(country),
bucket_key='*.csv',
wildcard_match=True,
bucket_name='s3sensorbucket/glueProcessed_ke_{}'.format(country),
aws_conn_id='my_s3_conn',
timeout=18 * 60 * 60,
poke_interval=10,
soft_fail=True,
default_args=default_args
)
您可以使用 bucket_key
而不是 bucket_name
。来自 docs:
bucket_key (str) -- The key being waited on. Supports full s3:// style url or relative path from root level. When it's specified as a full s3:// url, please leave bucket_name as None.
bucket_name (str) -- Name of the S3 bucket. Only needed when bucket_key is not provided as a full s3:// url.
s3KeySensor = S3KeySensor(
task_id="Check_file_in_s3_{}".format(country),
bucket_key=f"s3://s3tosensorbucketname/glueProcessed_ke_{country}/*.csv",
wildcard_match=True,
aws_conn_id="my_s3_conn",
timeout=18 * 60 * 60,
poke_interval=10,
soft_fail=True,
)
或者,如果您想使用 bucket_name
,请执行以下操作:
s3KeySensor = S3KeySensor(
task_id="Check_file_in_s3_{}".format(country),
bucket_key=f"glueProcessed_ke_{country}/*.csv",
bucket_name="s3sensorbucketname",
wildcard_match=True,
aws_conn_id="my_s3_conn",
timeout=18 * 60 * 60,
poke_interval=10,
soft_fail=True,
)
当前场景:这是我的 S3KeySensor 操作员。现在我已经厌倦了它并使用它并且它工作正常
s3KeySensor = S3KeySensor(task_id='Check_file_in_s3_{}'.format(country),
bucket_key='*.csv',
wildcard_match=True,
bucket_name='s3sensorbucket,
aws_conn_id='my_s3_conn',
timeout=18 * 60 * 60,
poke_interval=10,
soft_fail=True,
default_args=default_args
)
错误:
Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$" or be an ARN matching
the regex "^arn:(aws).*:(s3|s3-object-lambda):[a-z\-0-9]+:[0-9]{12}:accesspoint[/:].
[a-zA-Z0-9\-]{1,63}$|^arn:(aws).*:s3-outposts:[a-z\-0-9]+:[0-9]{12}:outpost[/:][a-zA-
Z0-9\-]{1,63}[/:]accesspoint[/:][a-zA-Z0-9\-]{1,63}
所需的输出:我希望我的输出看起来像这样,这样我就可以在存储桶中提供我的文件夹名称,这样 s3KeySensor 就可以查看该位置并继续选择是或否。我尝试了 Prefix 选项,但它说它没有函数名称前缀。 帮我解决一下?
s3KeySensor = S3KeySensor(task_id='Check_file_in_s3_{}'.format(country),
bucket_key='*.csv',
wildcard_match=True,
bucket_name='s3sensorbucket/glueProcessed_ke_{}'.format(country),
aws_conn_id='my_s3_conn',
timeout=18 * 60 * 60,
poke_interval=10,
soft_fail=True,
default_args=default_args
)
您可以使用 bucket_key
而不是 bucket_name
。来自 docs:
bucket_key (str) -- The key being waited on. Supports full s3:// style url or relative path from root level. When it's specified as a full s3:// url, please leave bucket_name as None.
bucket_name (str) -- Name of the S3 bucket. Only needed when bucket_key is not provided as a full s3:// url.
s3KeySensor = S3KeySensor(
task_id="Check_file_in_s3_{}".format(country),
bucket_key=f"s3://s3tosensorbucketname/glueProcessed_ke_{country}/*.csv",
wildcard_match=True,
aws_conn_id="my_s3_conn",
timeout=18 * 60 * 60,
poke_interval=10,
soft_fail=True,
)
或者,如果您想使用 bucket_name
,请执行以下操作:
s3KeySensor = S3KeySensor(
task_id="Check_file_in_s3_{}".format(country),
bucket_key=f"glueProcessed_ke_{country}/*.csv",
bucket_name="s3sensorbucketname",
wildcard_match=True,
aws_conn_id="my_s3_conn",
timeout=18 * 60 * 60,
poke_interval=10,
soft_fail=True,
)