是否可以使用数据库中的配置在 Dagster 中动态生成作业
Is it possible to generate jobs in Dagster dynamically using configuration from database
目前,我的数据库有多个部门。我需要将数据管道应用到所有这些具有不同配置的部门。
我想从数据库加载每个部门的配置。然后使用这些配置在 Dagster 中生成作业列表。
比如我有3个租户:
部门 1:配置 1
部门 2:配置 2
部门 3:配置 3
这些信息存储在我的数据库中。
如何加载这些信息并动态创建 3 个作业(管道):
具有配置 1 的部门 1 的管道 1
具有配置 2 的部门 2 的管道 2
具有配置 3 的部门 3 的管道 3
可以在 Dagster 上实现吗?我可以使用 Airflow(动态生成 DAG)来做到这一点,但不确定如何在 Dagster 中做到这一点。我无法在 Dagster 中加载 op/job 之外的数据库配置。
在 Dagster 中,您的@repository 函数只是一个常规函数,因此您可以运行其中的任意代码来查询您的数据库并动态生成作业:
@repository
def my_repo():
configs = # some query to your database
jobs = []
for config in configs:
jobs.append(get_job_for_my_config(config))
return jobs
如果您预计数据库调用可能会花费一些时间,您可以考虑制作您的存储库 lazy-loaded,Dagster RepositoryDefinition docs 详细说明了如何做。
目前,我的数据库有多个部门。我需要将数据管道应用到所有这些具有不同配置的部门。
我想从数据库加载每个部门的配置。然后使用这些配置在 Dagster 中生成作业列表。
比如我有3个租户:
部门 1:配置 1
部门 2:配置 2
部门 3:配置 3
这些信息存储在我的数据库中。
如何加载这些信息并动态创建 3 个作业(管道):
具有配置 1 的部门 1 的管道 1
具有配置 2 的部门 2 的管道 2
具有配置 3 的部门 3 的管道 3
可以在 Dagster 上实现吗?我可以使用 Airflow(动态生成 DAG)来做到这一点,但不确定如何在 Dagster 中做到这一点。我无法在 Dagster 中加载 op/job 之外的数据库配置。
在 Dagster 中,您的@repository 函数只是一个常规函数,因此您可以运行其中的任意代码来查询您的数据库并动态生成作业:
@repository
def my_repo():
configs = # some query to your database
jobs = []
for config in configs:
jobs.append(get_job_for_my_config(config))
return jobs
如果您预计数据库调用可能会花费一些时间,您可以考虑制作您的存储库 lazy-loaded,Dagster RepositoryDefinition docs 详细说明了如何做。