是否可以使用数据库中的配置在 Dagster 中动态生成作业

Is it possible to generate jobs in Dagster dynamically using configuration from database

目前,我的数据库有多个部门。我需要将数据管道应用到所有这些具有不同配置的部门。

我想从数据库加载每个部门的配置。然后使用这些配置在 Dagster 中生成作业列表。

比如我有3个租户:

部门 1:配置 1

部门 2:配置 2

部门 3:配置 3

这些信息存储在我的数据库中。

如何加载这些信息并动态创建 3 个作业(管道):

具有配置 1 的部门 1 的管道 1

具有配置 2 的部门 2 的管道 2

具有配置 3 的部门 3 的管道 3

可以在 Dagster 上实现吗?我可以使用 Airflow(动态生成 DAG)来做到这一点,但不确定如何在 Dagster 中做到这一点。我无法在 Dagster 中加载 op/job 之外的数据库配置。

在 Dagster 中,您的@repository 函数只是一个常规函数,因此您可以运行其中的任意代码来查询您的数据库并动态生成作业:

@repository
def my_repo():
    configs = # some query to your database
    jobs = []
    for config in configs:
         jobs.append(get_job_for_my_config(config))
    return jobs

如果您预计数据库调用可能会花费一些时间,您可以考虑制作您的存储库 lazy-loaded,Dagster RepositoryDefinition docs 详细说明了如何做。