Dagster 链接资源

Dagster chaining resources

我最近选择了 Dagster 作为 Airflow 的替代品进行评估。

我无法完全理解 资源 的概念,并希望了解我正在尝试做的事情是否可能或可以更好地实现另一种方式。

我有一个像下面这样的助手 class 可以帮助保持代码干燥

from dagster import resource, solid, ModeDefinition, pipeline
from dagster_aws.s3 import s3_resource

class HelperAwsS3:
    def __init__(self, s3_resource):
        self.s3_resource = s3_resource

    def s3_list_bucket(self, bucket, prefix):
        return self.s3_resource.list_objects_v2(
            Bucket=bucket,
            Prefix=prefix
        )

    def s3_download_file(self, bucket, file, local_path):
        self.s3_resource.meta.client.download_file(
            Bucket=bucket,
            Key=file,
            Filename=local_path
        )

    def s3_upload_file(self, bucket, file, local_path):
        self.s3_resource.meta.client.upload_file(
            Bucket=bucket,
            Key=file,
            Filename=local_path
        )

s3_resource 实际上是 dagster_aws.s3.s3_resource 这对我有帮助使用我的本地 aws credenitals 连接到 AWS。

当我在 @resource 中进行调用时,我不确定如何将 s3_resource 传递给 HelperAwsS3下面的部分。

@resource
def connection_helper_aws_s3_resource(context):
    return HelperAwsS3()

有什么指点吗?还是我做错了,需要换一种方式?

感谢您的帮助。

我在 dagster Slack 频道上发布了同样的问题,很快就得到了乐于助人的团队的回复。将其张贴在这里,以防对某人有所帮助 -

保留您的 HelperAwsS3 class 并编写您自己的使用 s3 资源的资源,它可能看起来像这样:

@resource(required_resource_keys={"s3"})
def connection_helper_aws_s3_resource(context):
    return HelperAwsS3(s3_resource=context.resources.s3)

(然后确保在模式定义中包含 s3 资源和自定义资源:

@pipeline(mode_defs=[ModeDefinition(
  resource_defs={"s3": s3_resource, "connection_helper_aws_s3": connection_helper_aws_s3_resource}
)]):
  ...