在 dagster 中处理用户输入

Handling user input in dagster

我是 dagster 的新手,我想了解它是如何处理用户输入的。我正在使用以下代码对此进行测试:

from dagster import job, op


@op
def input_string():
    ret = input('Enter string')
    print(ret)


@job
def my_job():
    input_string()


if __name__ == '__main__':
    my_job.execute_in_process()

然后我 运行 在控制台中输入以下内容:

dagit -f test.py

然而,当我最终“启动 运行”时,我没有机会输入,而是得到一个包含以下信息的 EOFError:

dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "input_string": File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_plan.py", line 232, in dagster_event_sequence_for_step for step_event in check.generator(step_events): File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_step.py", line 354, in core_dagster_event_sequence_for_step for user_event in check.generator( File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_step.py", line 70, in _step_output_error_checked_user_event_sequence for user_event in user_event_sequence: File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 170, in execute_core_compute for step_output in yield_compute_results(step_context, inputs, compute_fn): File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 138, in yield_compute_results for event in iterate_with_context( File "C:\Users\username\Anaconda3\lib\site-packages\dagster\utils_init.py", line 403, in iterate_with_context return File "C:\Users\username\Anaconda3\lib\contextlib.py", line 137, in exit self.gen.throw(typ, value, traceback) File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\utils.py", line 73, in solid_execution_error_boundary raise error_cls( The above exception was caused by the following exception: EOFError: EOF when reading a line File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\utils.py", line 47, in solid_execution_error_boundary yield File "C:\Users\username\Anaconda3\lib\site-packages\dagster\utils_init.py", line 401, in iterate_with_context next_output = next(iterator) File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute_generator.py", line 65, in _coerce_solid_compute_fn_to_iterator result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs) File "test.py", line 14, in input_string ret = input('Enter string')

我怎样才能把它送到 运行?

ops 使用 config schema. This allows you to provide configuration via the Dagit Launchpad

配置

在您的情况下,您希望从 @op 代码中删除 input 调用。然后,您将使用 context.op_config 字典从提供给您的操作的配置对象中检索输入,如下所示:

@op(config_schema={'input1': str})
def input_string(context):
    ret = context.op_config['input1']
    print(ret)

@job
def my_job():
    input_string()


if __name__ == '__main__':
    my_job.execute_in_process()

编辑:要让您的输入在 Dagster 作业控制台中打印,请使用 built-in Dagster logger,如下所示:

@op(config_schema={'input1': str})
def input_string(context):
    ret = context.op_config['input1']
    context.log.info(ret)