测试 dagster 管道
Testing a dagster pipeline
总结:Dagster 运行 Dagit 与 PyTest 的配置似乎与我的项目不兼容
我在尝试 运行 管道上的 pytest 时遇到错误,非常感谢任何指点。我一直收到以下形式的错误:
dagster.core.errors.DagsterInvalidConfigError:
Error in config for pipeline ephemeral_write_myfunc_to_redis_solid_pipeline
Error 1: Undefined field "myfunc_df_to_list" at path root:solids.
Expected: "{ myfunc_list?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] }
write_myfunc_to_redis?:..."
关于项目的几点说明:
- dagster,版本 0.9.15
- 我在 Dagit 中的管道 运行s 对于相同的配置没有错误
- 单元测试 运行 组成管道的单个实体
失败的解决方案: 我已经尝试使用定义输出的实体填充配置文件,因为每个 pytest 错误都建议,但它们都导致错误比前面一个。
我的固体是:
@solid(required_resource_keys={"db"})
def get_myfunc_df(context, query: String) -> myfuncDF:
do something
return myfuncDF
@solid
def myfunc_df_to_list(context, df: myfuncDF) -> List:
do something
return List
@solid(required_resource_keys={"redis"})
def write_myfunc_to_redis(context, myfunc_list:List) -> None:
write to redis return None
而我的管道就是这些实体的链条
@pipeline(
mode_defs=filter_modes(MODES),
preset_defs=filter_presets(PRESETS),
tags={"type": "myproject"},
)
def myfunc_to_redis_pipeline():
df = get_myfunc_df()
myfunc_list = myfunc_df_to_list(df)
write_myfunc_to_redis(myfunc_list)
我在test_main.py中的测试代码是
@pytest.mark.myfunc
def test_myfunc_to_redis_pipeline(self):
res = execute_pipeline(myfunc_to_redis_pipeline,
preset="test",)
assert res.success
assert len(res.solid_result_list) == 4
for solid_res in res.solid_result_list:
assert solid_res.success
在 yaml 文件中使用 运行 配置定义预设“测试”:
resources:
db:
config:
file_path: test.csv
^ 这是它抛出最多错误的地方,我一直在迭代不同的实体排列以添加 ala:
solids:
get_myfunc_df:
inputs:
query:
value: select 1
但是还没有解决问题。尽管在 Dagit 中 运行 宁,只有输入实体需要定义,但是否有任何理由用于测试的实体需要定义它们的输出?
此错误是否表示有其他问题?
编辑:这是来自 tox --verbose
的堆栈跟踪
self = <repos.myfunc.myfunc.dagster.tests.test_main.Test_myfunc testMethod=test_myfunc_df>
@pytest.mark.myfunc
def test_myfunc_df(self):
"""myfunc"""
result = execute_solid(
get_myfunc_df,
mode_def=test_mode,
run_config=run_config,
> input_values={"query": "SELECT 1"},
)
repos/myfunc/myfunc/dagster/tests/test_main.py:29:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/utils/test/__init__.py:324: in execute_solid
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:335: in execute_pipeline
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/telemetry.py:90: in wrap
result = f(*args, **kwargs)
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:375: in _logged_execute_pipeline
tags=tags,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/instance/__init__.py:586: in create_run_for_pipeline
pipeline_def, run_config=run_config, mode=mode,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:644: in create_execution_plan
environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pipeline_def = <dagster.core.definitions.pipeline.PipelineDefinition object at 0x1359f6210>
run_config = {'resources': {'ge_data_context': {'config': {'ge_root_dir': '/Users/this_user/Workspace/drizly-dagster/repos/datas...cause_you_bought/dagster/tests/test.csv'}}}, 'solids': {'get_myfunc_df': {'inputs': {'query': {'value': 'select 1'}}}}}
mode = 'test'
@staticmethod
def build(pipeline_def, run_config=None, mode=None):
"""This method validates a given run config against the pipeline config schema. If
successful, we instantiate an EnvironmentConfig object.
In case the run_config is invalid, this method raises a DagsterInvalidConfigError
"""
from dagster.config.validate import process_config
from dagster.core.definitions.executor import ExecutorDefinition
from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
from dagster.core.definitions.system_storage import SystemStorageDefinition
from .composite_descent import composite_descent
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
run_config = check.opt_dict_param(run_config, "run_config")
check.opt_str_param(mode, "mode")
mode = mode or pipeline_def.get_default_mode_name()
environment_type = create_environment_type(pipeline_def, mode)
config_evr = process_config(environment_type, run_config)
if not config_evr.success:
raise DagsterInvalidConfigError(
"Error in config for pipeline {}".format(pipeline_def.name),
config_evr.errors,
> run_config,
)
E dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline ephemeral_get_myfunc_df_solid_pipeline
E Error 1: Undefined field "inputs" at path root:solids:get_myfunc_df. Expected: "{ outputs?: [{ result?: { csv: { path: (String | { env: String }) sep?: (String | { env: String }) } parquet: { path: (String | { env: String }) } pickle: { path: (String | { env: String }) } table: { path: (String | { env: String }) } } }] }".
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/system_config/objects.py:101: DagsterInvalidConfigError
_______________________________________________________________________ Test_myfunc.test_write_myfunc_to_redis ________________________________________________________________________
self = <repos.myfunc.myfunc.dagster.tests.test_main.Test_myfunc testMethod=test_write_myfunc_to_redis>
@pytest.mark.myfunc
def test_write_myfunc_to_redis(self):
"""Test redis write"""
records = [
("k", "v"),
("k2", "v2"),
]
result = execute_solid(
write_myfunc_to_redis,
mode_def=test_mode,
input_values={"myfunc_list": records},
> run_config=run_config,
)
repos/myfunc/myfunc/dagster/tests/test_main.py:56:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/utils/test/__init__.py:324: in execute_solid
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:335: in execute_pipeline
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/telemetry.py:90: in wrap
result = f(*args, **kwargs)
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:375: in _logged_execute_pipeline
tags=tags,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/instance/__init__.py:586: in create_run_for_pipeline
pipeline_def, run_config=run_config, mode=mode,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:644: in create_execution_plan
environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pipeline_def = <dagster.core.definitions.pipeline.PipelineDefinition object at 0x135d39490>
run_config = {'resources': {'ge_data_context': {'config': {'ge_root_dir': '/Users/this_user/Workspace/drizly-dagster/repos/datas...cause_you_bought/dagster/tests/test.csv'}}}, 'solids': {'get_myfunc_df': {'inputs': {'query': {'value': 'select 1'}}}}}
mode = 'test'
@staticmethod
def build(pipeline_def, run_config=None, mode=None):
"""This method validates a given run config against the pipeline config schema. If
successful, we instantiate an EnvironmentConfig object.
In case the run_config is invalid, this method raises a DagsterInvalidConfigError
"""
from dagster.config.validate import process_config
from dagster.core.definitions.executor import ExecutorDefinition
from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
from dagster.core.definitions.system_storage import SystemStorageDefinition
from .composite_descent import composite_descent
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
run_config = check.opt_dict_param(run_config, "run_config")
check.opt_str_param(mode, "mode")
mode = mode or pipeline_def.get_default_mode_name()
environment_type = create_environment_type(pipeline_def, mode)
config_evr = process_config(environment_type, run_config)
if not config_evr.success:
raise DagsterInvalidConfigError(
"Error in config for pipeline {}".format(pipeline_def.name),
config_evr.errors,
> run_config,
)
E dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline ephemeral_write_myfunc_to_redis_solid_pipeline
E Error 1: Undefined field "get_myfunc_df" at path root:solids. Expected: "{ myfunc_list?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } write_myfunc_to_redis?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/system_config/objects.py:101: DagsterInvalidConfigError
=============================================================================== short test summary info ===============================================================================
FAILED repos/myfunc/myfunc/dagster/tests/test_main.py::Test_myfunc::test_myfunc_df - dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeli...
FAILED repos/myfunc/myfunc/dagster/tests/test_main.py::Test_myfunc::test_write_myfunc_to_redis - dagster.core.errors.DagsterInvalidConfigError: Error in conf
下面的解决方案有效
关键问题是管道需要在配置中定义固体,如所写的那样,固体在相同的配置和测试函数中传递 input_values。我的更改是删除“input_values”作为参数并通过 运行 配置传递它们。由于我的间隙实体需要更复杂的对象并且我的配置文件是 yaml,所以我在所有实体测试中添加了以下内容:
this_solid_run_config = copy.deepcopy(run_config)
input_dict = {"df": pd.DataFrame(['1', '2'], columns = ['key', 'value'])}
this_solid_run_config.update({"solids":
{"myfunc_df_to_list":
{"inputs":input_dict
}
}
}
)
根据堆栈跟踪,失败来自于:
result = execute_solid(
get_myfunc_df,
mode_def=test_mode,
run_config=run_config,
input_values={"query": "SELECT 1"},
)
实体输入“查询”应该从“input_values”参数或“run_config”参数传递,但不能同时传递。如果这不能解决您的问题,很高兴继续挖掘。
总结:Dagster 运行 Dagit 与 PyTest 的配置似乎与我的项目不兼容
我在尝试 运行 管道上的 pytest 时遇到错误,非常感谢任何指点。我一直收到以下形式的错误:
dagster.core.errors.DagsterInvalidConfigError:
Error in config for pipeline ephemeral_write_myfunc_to_redis_solid_pipeline
Error 1: Undefined field "myfunc_df_to_list" at path root:solids.
Expected: "{ myfunc_list?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] }
write_myfunc_to_redis?:..."
关于项目的几点说明:
- dagster,版本 0.9.15
- 我在 Dagit 中的管道 运行s 对于相同的配置没有错误
- 单元测试 运行 组成管道的单个实体
失败的解决方案: 我已经尝试使用定义输出的实体填充配置文件,因为每个 pytest 错误都建议,但它们都导致错误比前面一个。
我的固体是:
@solid(required_resource_keys={"db"})
def get_myfunc_df(context, query: String) -> myfuncDF:
do something
return myfuncDF
@solid
def myfunc_df_to_list(context, df: myfuncDF) -> List:
do something
return List
@solid(required_resource_keys={"redis"})
def write_myfunc_to_redis(context, myfunc_list:List) -> None:
write to redis return None
而我的管道就是这些实体的链条
@pipeline(
mode_defs=filter_modes(MODES),
preset_defs=filter_presets(PRESETS),
tags={"type": "myproject"},
)
def myfunc_to_redis_pipeline():
df = get_myfunc_df()
myfunc_list = myfunc_df_to_list(df)
write_myfunc_to_redis(myfunc_list)
我在test_main.py中的测试代码是
@pytest.mark.myfunc
def test_myfunc_to_redis_pipeline(self):
res = execute_pipeline(myfunc_to_redis_pipeline,
preset="test",)
assert res.success
assert len(res.solid_result_list) == 4
for solid_res in res.solid_result_list:
assert solid_res.success
在 yaml 文件中使用 运行 配置定义预设“测试”:
resources:
db:
config:
file_path: test.csv
^ 这是它抛出最多错误的地方,我一直在迭代不同的实体排列以添加 ala:
solids:
get_myfunc_df:
inputs:
query:
value: select 1
但是还没有解决问题。尽管在 Dagit 中 运行 宁,只有输入实体需要定义,但是否有任何理由用于测试的实体需要定义它们的输出?
此错误是否表示有其他问题?
编辑:这是来自 tox --verbose
的堆栈跟踪self = <repos.myfunc.myfunc.dagster.tests.test_main.Test_myfunc testMethod=test_myfunc_df>
@pytest.mark.myfunc
def test_myfunc_df(self):
"""myfunc"""
result = execute_solid(
get_myfunc_df,
mode_def=test_mode,
run_config=run_config,
> input_values={"query": "SELECT 1"},
)
repos/myfunc/myfunc/dagster/tests/test_main.py:29:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/utils/test/__init__.py:324: in execute_solid
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:335: in execute_pipeline
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/telemetry.py:90: in wrap
result = f(*args, **kwargs)
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:375: in _logged_execute_pipeline
tags=tags,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/instance/__init__.py:586: in create_run_for_pipeline
pipeline_def, run_config=run_config, mode=mode,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:644: in create_execution_plan
environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pipeline_def = <dagster.core.definitions.pipeline.PipelineDefinition object at 0x1359f6210>
run_config = {'resources': {'ge_data_context': {'config': {'ge_root_dir': '/Users/this_user/Workspace/drizly-dagster/repos/datas...cause_you_bought/dagster/tests/test.csv'}}}, 'solids': {'get_myfunc_df': {'inputs': {'query': {'value': 'select 1'}}}}}
mode = 'test'
@staticmethod
def build(pipeline_def, run_config=None, mode=None):
"""This method validates a given run config against the pipeline config schema. If
successful, we instantiate an EnvironmentConfig object.
In case the run_config is invalid, this method raises a DagsterInvalidConfigError
"""
from dagster.config.validate import process_config
from dagster.core.definitions.executor import ExecutorDefinition
from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
from dagster.core.definitions.system_storage import SystemStorageDefinition
from .composite_descent import composite_descent
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
run_config = check.opt_dict_param(run_config, "run_config")
check.opt_str_param(mode, "mode")
mode = mode or pipeline_def.get_default_mode_name()
environment_type = create_environment_type(pipeline_def, mode)
config_evr = process_config(environment_type, run_config)
if not config_evr.success:
raise DagsterInvalidConfigError(
"Error in config for pipeline {}".format(pipeline_def.name),
config_evr.errors,
> run_config,
)
E dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline ephemeral_get_myfunc_df_solid_pipeline
E Error 1: Undefined field "inputs" at path root:solids:get_myfunc_df. Expected: "{ outputs?: [{ result?: { csv: { path: (String | { env: String }) sep?: (String | { env: String }) } parquet: { path: (String | { env: String }) } pickle: { path: (String | { env: String }) } table: { path: (String | { env: String }) } } }] }".
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/system_config/objects.py:101: DagsterInvalidConfigError
_______________________________________________________________________ Test_myfunc.test_write_myfunc_to_redis ________________________________________________________________________
self = <repos.myfunc.myfunc.dagster.tests.test_main.Test_myfunc testMethod=test_write_myfunc_to_redis>
@pytest.mark.myfunc
def test_write_myfunc_to_redis(self):
"""Test redis write"""
records = [
("k", "v"),
("k2", "v2"),
]
result = execute_solid(
write_myfunc_to_redis,
mode_def=test_mode,
input_values={"myfunc_list": records},
> run_config=run_config,
)
repos/myfunc/myfunc/dagster/tests/test_main.py:56:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/utils/test/__init__.py:324: in execute_solid
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:335: in execute_pipeline
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/telemetry.py:90: in wrap
result = f(*args, **kwargs)
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:375: in _logged_execute_pipeline
tags=tags,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/instance/__init__.py:586: in create_run_for_pipeline
pipeline_def, run_config=run_config, mode=mode,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:644: in create_execution_plan
environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pipeline_def = <dagster.core.definitions.pipeline.PipelineDefinition object at 0x135d39490>
run_config = {'resources': {'ge_data_context': {'config': {'ge_root_dir': '/Users/this_user/Workspace/drizly-dagster/repos/datas...cause_you_bought/dagster/tests/test.csv'}}}, 'solids': {'get_myfunc_df': {'inputs': {'query': {'value': 'select 1'}}}}}
mode = 'test'
@staticmethod
def build(pipeline_def, run_config=None, mode=None):
"""This method validates a given run config against the pipeline config schema. If
successful, we instantiate an EnvironmentConfig object.
In case the run_config is invalid, this method raises a DagsterInvalidConfigError
"""
from dagster.config.validate import process_config
from dagster.core.definitions.executor import ExecutorDefinition
from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
from dagster.core.definitions.system_storage import SystemStorageDefinition
from .composite_descent import composite_descent
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
run_config = check.opt_dict_param(run_config, "run_config")
check.opt_str_param(mode, "mode")
mode = mode or pipeline_def.get_default_mode_name()
environment_type = create_environment_type(pipeline_def, mode)
config_evr = process_config(environment_type, run_config)
if not config_evr.success:
raise DagsterInvalidConfigError(
"Error in config for pipeline {}".format(pipeline_def.name),
config_evr.errors,
> run_config,
)
E dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline ephemeral_write_myfunc_to_redis_solid_pipeline
E Error 1: Undefined field "get_myfunc_df" at path root:solids. Expected: "{ myfunc_list?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } write_myfunc_to_redis?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/system_config/objects.py:101: DagsterInvalidConfigError
=============================================================================== short test summary info ===============================================================================
FAILED repos/myfunc/myfunc/dagster/tests/test_main.py::Test_myfunc::test_myfunc_df - dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeli...
FAILED repos/myfunc/myfunc/dagster/tests/test_main.py::Test_myfunc::test_write_myfunc_to_redis - dagster.core.errors.DagsterInvalidConfigError: Error in conf
下面的解决方案有效 关键问题是管道需要在配置中定义固体,如所写的那样,固体在相同的配置和测试函数中传递 input_values。我的更改是删除“input_values”作为参数并通过 运行 配置传递它们。由于我的间隙实体需要更复杂的对象并且我的配置文件是 yaml,所以我在所有实体测试中添加了以下内容:
this_solid_run_config = copy.deepcopy(run_config)
input_dict = {"df": pd.DataFrame(['1', '2'], columns = ['key', 'value'])}
this_solid_run_config.update({"solids":
{"myfunc_df_to_list":
{"inputs":input_dict
}
}
}
)
根据堆栈跟踪,失败来自于:
result = execute_solid(
get_myfunc_df,
mode_def=test_mode,
run_config=run_config,
input_values={"query": "SELECT 1"},
)
实体输入“查询”应该从“input_values”参数或“run_config”参数传递,但不能同时传递。如果这不能解决您的问题,很高兴继续挖掘。