在 dagster 中处理用户输入
Handling user input in dagster
我是 dagster 的新手,我想了解它是如何处理用户输入的。我正在使用以下代码对此进行测试:
from dagster import job, op
@op
def input_string():
ret = input('Enter string')
print(ret)
@job
def my_job():
input_string()
if __name__ == '__main__':
my_job.execute_in_process()
然后我 运行 在控制台中输入以下内容:
dagit -f test.py
然而,当我最终“启动 运行”时,我没有机会输入,而是得到一个包含以下信息的 EOFError:
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred
while executing op "input_string": File
"C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_plan.py",
line 232, in dagster_event_sequence_for_step
for step_event in check.generator(step_events): File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_step.py",
line 354, in core_dagster_event_sequence_for_step
for user_event in check.generator( File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_step.py",
line 70, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence: File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py",
line 170, in execute_core_compute
for step_output in yield_compute_results(step_context, inputs, compute_fn): File
"C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py",
line 138, in yield_compute_results
for event in iterate_with_context( File "C:\Users\username\Anaconda3\lib\site-packages\dagster\utils_init.py",
line 403, in iterate_with_context
return File "C:\Users\username\Anaconda3\lib\contextlib.py", line 137, in exit
self.gen.throw(typ, value, traceback) File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\utils.py",
line 73, in solid_execution_error_boundary
raise error_cls( The above exception was caused by the following exception: EOFError: EOF when reading a line File
"C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\utils.py",
line 47, in solid_execution_error_boundary
yield File "C:\Users\username\Anaconda3\lib\site-packages\dagster\utils_init.py",
line 401, in iterate_with_context
next_output = next(iterator) File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute_generator.py",
line 65, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs) File "test.py", line 14, in input_string
ret = input('Enter string')
我怎样才能把它送到 运行?
ops 使用 config schema. This allows you to provide configuration via the Dagit Launchpad
配置
在您的情况下,您希望从 @op
代码中删除 input
调用。然后,您将使用 context.op_config
字典从提供给您的操作的配置对象中检索输入,如下所示:
@op(config_schema={'input1': str})
def input_string(context):
ret = context.op_config['input1']
print(ret)
@job
def my_job():
input_string()
if __name__ == '__main__':
my_job.execute_in_process()
编辑:要让您的输入在 Dagster 作业控制台中打印,请使用 built-in Dagster logger,如下所示:
@op(config_schema={'input1': str})
def input_string(context):
ret = context.op_config['input1']
context.log.info(ret)
我是 dagster 的新手,我想了解它是如何处理用户输入的。我正在使用以下代码对此进行测试:
from dagster import job, op
@op
def input_string():
ret = input('Enter string')
print(ret)
@job
def my_job():
input_string()
if __name__ == '__main__':
my_job.execute_in_process()
然后我 运行 在控制台中输入以下内容:
dagit -f test.py
然而,当我最终“启动 运行”时,我没有机会输入,而是得到一个包含以下信息的 EOFError:
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "input_string": File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_plan.py", line 232, in dagster_event_sequence_for_step for step_event in check.generator(step_events): File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_step.py", line 354, in core_dagster_event_sequence_for_step for user_event in check.generator( File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_step.py", line 70, in _step_output_error_checked_user_event_sequence for user_event in user_event_sequence: File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 170, in execute_core_compute for step_output in yield_compute_results(step_context, inputs, compute_fn): File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 138, in yield_compute_results for event in iterate_with_context( File "C:\Users\username\Anaconda3\lib\site-packages\dagster\utils_init.py", line 403, in iterate_with_context return File "C:\Users\username\Anaconda3\lib\contextlib.py", line 137, in exit self.gen.throw(typ, value, traceback) File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\utils.py", line 73, in solid_execution_error_boundary raise error_cls( The above exception was caused by the following exception: EOFError: EOF when reading a line File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\utils.py", line 47, in solid_execution_error_boundary yield File "C:\Users\username\Anaconda3\lib\site-packages\dagster\utils_init.py", line 401, in iterate_with_context next_output = next(iterator) File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute_generator.py", line 65, in _coerce_solid_compute_fn_to_iterator result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs) File "test.py", line 14, in input_string ret = input('Enter string')
我怎样才能把它送到 运行?
ops 使用 config schema. This allows you to provide configuration via the Dagit Launchpad
配置在您的情况下,您希望从 @op
代码中删除 input
调用。然后,您将使用 context.op_config
字典从提供给您的操作的配置对象中检索输入,如下所示:
@op(config_schema={'input1': str})
def input_string(context):
ret = context.op_config['input1']
print(ret)
@job
def my_job():
input_string()
if __name__ == '__main__':
my_job.execute_in_process()
编辑:要让您的输入在 Dagster 作业控制台中打印,请使用 built-in Dagster logger,如下所示:
@op(config_schema={'input1': str})
def input_string(context):
ret = context.op_config['input1']
context.log.info(ret)