Luigi 可以传播异常或 return 任何结果吗?

Can Luigi propagate exception or return any result?

我正在使用 Luigi 启动一些管道。 举个简单的例子

task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()

现在假设 myTask 在执行期间引发异常。我所能拥有的只是 luigi 显示异常的日志。

luigi 有什么方法可以传播它或至少 return 一个 failure 状态吗?

然后我就可以让我的程序根据该状态做出反应。

谢谢。

编辑 当我存储结果时,我忘记指定 luigi 的输出以数据库为目标。如果引发异常,则不会存储任何结果,但不会将异常传播出 luigi。我想知道 luigi 是否可以选择这个。

您可以做的是将错误写入文件。例如,在您可能会失败的任务中(我们称之为 TaskA):

x=""
try:
    do stuff
except:
    x="error!"
with open('errorfile.log','w') as f:
    f.write(x)

然后,在依赖于该错误的任务中,该任务将需要 TaskA。你可以这样做:

with open('errorfile.log','r') as f:
    if f.read()://if anything is in the error log from TaskA
        //error occurred
        do stuff
    else:
        do other stuff

来自docs

Luigi has a built-in event system that allows you to register callbacks to events and trigger them from your own tasks. You can both hook into some pre-defined events and create your own. Each event handle is tied to a Task class and will be triggered only from that class or a subclass of it. This allows you to effortlessly subscribe to events only from a specific class (e.g. for hadoop jobs).

示例:

import luigi

from my_tasks import MyTask


@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
    of `run` on any MyTask subclass
    """

    do_something()


luigi.run()

Luigi lot of events you can choose from. You can also take a look at this tests 是为了学习如何倾听其他事件并做出反应。