Dagster 作业的正确分区配置是什么?
What is proper Partition configs for Dagster job?
目前,我正面临 dagster.core.errors.PartitionExecutionError
但来自 Dagster 的错误日志对我来说似乎并不明显。
dagster.core.errors.PartitionExecutionError: Error occurred during the evaluation of the `run_config_for_partition` function for partition set download_firebase_data_local_partition_set
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 292, in get_partition_config
return ExternalPartitionConfigData(name=partition.name, run_config=run_config)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 192, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
TypeError: daily_download_config() takes 1 positional argument but 2 were given
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 185, in user_code_error_boundary
yield
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 291, in get_partition_config
run_config = partition_set_def.run_config_for_partition(partition)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/partition.py", line 441, in run_config_for_partition
return copy.deepcopy(self._user_defined_run_config_fn_for_partition(partition))
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/time_window_partitions.py", line 192, in <lambda>
run_config_for_partition_fn=lambda partition: fn(
我当前的设置是
@graph
def download():
"""
Download data from BigQuery then upload to S3
"""
extract_data_in_date()
@daily_partitioned_config(start_date=datetime(2021, 12, 1))
def daily_download_config(date: datetime):
return {
"resources": {
"date": date.strftime("%Y-%m-%d")
}
}
download_local_job = download.to_job(
name=f'{NAME}_local',
resource_defs={
**{
"date": make_values_resource(date=str),
"project_name": ResourceDefinition.hardcoded_resource("test-123")
},
**RESOURCES_LOCAL,
},
config=daily_download_config,
executor_def=in_process_executor
)
我不确定我哪里错了,你能帮忙吗
@daily_paritioned_config
需要能够接受两个参数,一个用于时间 window 的开始,一个用于结束。 daily_download_config
实际上并没有使用这个结束日期值,但它仍然需要显示在签名中,因为无论如何 Dagster 都会尝试将两个参数传递给这个函数
目前,我正面临 dagster.core.errors.PartitionExecutionError
但来自 Dagster 的错误日志对我来说似乎并不明显。
dagster.core.errors.PartitionExecutionError: Error occurred during the evaluation of the `run_config_for_partition` function for partition set download_firebase_data_local_partition_set
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 292, in get_partition_config
return ExternalPartitionConfigData(name=partition.name, run_config=run_config)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 192, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
TypeError: daily_download_config() takes 1 positional argument but 2 were given
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 185, in user_code_error_boundary
yield
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 291, in get_partition_config
run_config = partition_set_def.run_config_for_partition(partition)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/partition.py", line 441, in run_config_for_partition
return copy.deepcopy(self._user_defined_run_config_fn_for_partition(partition))
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/time_window_partitions.py", line 192, in <lambda>
run_config_for_partition_fn=lambda partition: fn(
我当前的设置是
@graph
def download():
"""
Download data from BigQuery then upload to S3
"""
extract_data_in_date()
@daily_partitioned_config(start_date=datetime(2021, 12, 1))
def daily_download_config(date: datetime):
return {
"resources": {
"date": date.strftime("%Y-%m-%d")
}
}
download_local_job = download.to_job(
name=f'{NAME}_local',
resource_defs={
**{
"date": make_values_resource(date=str),
"project_name": ResourceDefinition.hardcoded_resource("test-123")
},
**RESOURCES_LOCAL,
},
config=daily_download_config,
executor_def=in_process_executor
)
我不确定我哪里错了,你能帮忙吗
@daily_paritioned_config
需要能够接受两个参数,一个用于时间 window 的开始,一个用于结束。 daily_download_config
实际上并没有使用这个结束日期值,但它仍然需要显示在签名中,因为无论如何 Dagster 都会尝试将两个参数传递给这个函数