是否可以使用 Dagster 创建动态作业?
Is it possible to create dynamic jobs with Dagster?
考虑这个示例 - 您需要从源数据库加载 table1,执行一些通用转换(例如为带时间戳的列转换时区)并将结果数据写入 Snowflake。这很简单,可以使用 3 个 dagster 操作来实现。
现在,假设您需要使用 100 个表来做同样的事情。你会如何处理 dagster?您真的需要创建 100 个 jobs/graphs 吗?或者您可以创建一个将执行 100 次的作业吗?您能否限制同时 运行 这些作业的数量?
您有两个主要选择:
- 与 Dynamic Outputs 一起使用单个作业:
使用此设置,您的所有 ETL 都将在一个作业中发生。您将有一个初始操作,它会为您想要为其执行此过程的每个 table 名称生成一个 DynamicOutput,并将其提供给一组操作(可能组织成图表),即 运行 在每个单独的 DynamicOutput 上。
根据您使用的执行程序,可以限制整体步骤并发(例如,默认 multiprocess_executor 支持此选项)。
- 创建一个可配置的作业(我认为这更有可能是您想要的)
from dagster import job, op, graph
import pandas as pd
@op(config_schema={"table_name": str})
def extract_table(context) -> pd.DataFrame:
table_name = context.op_config["table_name"]
# do some load...
return pd.DataFrame()
@op
def transform_table(table: pd.DataFrame) -> pd.DataFrame:
# do some transform...
return table
@op(config_schema={"table_name": str})
def load_table(context, table: pd.DataFrame):
table_name = context.op_config["table_name"]
# load to snowflake...
@job
def configurable_etl():
load_table(transform_table(extract_table()))
# this is what the configuration would look like to extract from table
# src_foo and load into table dest_foo
configurable_etl.execute_in_process(
run_config={
"ops": {
"extract_table": {"config": {"table_name": "src_foo"}},
"load_table": {"config": {"table_name": "dest_foo"}},
}
}
)
在这里,您通过为相关操作提供配置模式来创建一个可以指向源 table 和目标 table 的作业。根据这些配置选项(当您通过 运行 配置创建 运行 时提供),您的作业将在不同的源/目标 tables.
上运行
该示例明确显示 运行使用 python API 执行此作业,但如果您从 Dagit 运行执行此作业,您还可以输入 yaml这个配置的版本在那里。如果你想简化配置模式(如图所示,它非常嵌套),你总是可以创建一个 Config Mapping 来使界面更好:)
从这里,您可以通过为您的作业提供唯一标记并使用 QueuedRunCoordinator 限制该标记的最大并发数 运行 来限制 运行 并发.
考虑这个示例 - 您需要从源数据库加载 table1,执行一些通用转换(例如为带时间戳的列转换时区)并将结果数据写入 Snowflake。这很简单,可以使用 3 个 dagster 操作来实现。
现在,假设您需要使用 100 个表来做同样的事情。你会如何处理 dagster?您真的需要创建 100 个 jobs/graphs 吗?或者您可以创建一个将执行 100 次的作业吗?您能否限制同时 运行 这些作业的数量?
您有两个主要选择:
- 与 Dynamic Outputs 一起使用单个作业:
使用此设置,您的所有 ETL 都将在一个作业中发生。您将有一个初始操作,它会为您想要为其执行此过程的每个 table 名称生成一个 DynamicOutput,并将其提供给一组操作(可能组织成图表),即 运行 在每个单独的 DynamicOutput 上。
根据您使用的执行程序,可以限制整体步骤并发(例如,默认 multiprocess_executor 支持此选项)。
- 创建一个可配置的作业(我认为这更有可能是您想要的)
from dagster import job, op, graph
import pandas as pd
@op(config_schema={"table_name": str})
def extract_table(context) -> pd.DataFrame:
table_name = context.op_config["table_name"]
# do some load...
return pd.DataFrame()
@op
def transform_table(table: pd.DataFrame) -> pd.DataFrame:
# do some transform...
return table
@op(config_schema={"table_name": str})
def load_table(context, table: pd.DataFrame):
table_name = context.op_config["table_name"]
# load to snowflake...
@job
def configurable_etl():
load_table(transform_table(extract_table()))
# this is what the configuration would look like to extract from table
# src_foo and load into table dest_foo
configurable_etl.execute_in_process(
run_config={
"ops": {
"extract_table": {"config": {"table_name": "src_foo"}},
"load_table": {"config": {"table_name": "dest_foo"}},
}
}
)
在这里,您通过为相关操作提供配置模式来创建一个可以指向源 table 和目标 table 的作业。根据这些配置选项(当您通过 运行 配置创建 运行 时提供),您的作业将在不同的源/目标 tables.
上运行该示例明确显示 运行使用 python API 执行此作业,但如果您从 Dagit 运行执行此作业,您还可以输入 yaml这个配置的版本在那里。如果你想简化配置模式(如图所示,它非常嵌套),你总是可以创建一个 Config Mapping 来使界面更好:)
从这里,您可以通过为您的作业提供唯一标记并使用 QueuedRunCoordinator 限制该标记的最大并发数 运行 来限制 运行 并发.