Luigi 可以传播异常或 return 任何结果吗?
Can Luigi propagate exception or return any result?
我正在使用 Luigi 启动一些管道。
举个简单的例子
task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()
现在假设 myTask
在执行期间引发异常。我所能拥有的只是 luigi 显示异常的日志。
luigi 有什么方法可以传播它或至少 return 一个 failure
状态吗?
然后我就可以让我的程序根据该状态做出反应。
谢谢。
编辑
当我存储结果时,我忘记指定 luigi 的输出以数据库为目标。如果引发异常,则不会存储任何结果,但不会将异常传播出 luigi。我想知道 luigi 是否可以选择这个。
您可以做的是将错误写入文件。例如,在您可能会失败的任务中(我们称之为 TaskA):
x=""
try:
do stuff
except:
x="error!"
with open('errorfile.log','w') as f:
f.write(x)
然后,在依赖于该错误的任务中,该任务将需要 TaskA。你可以这样做:
with open('errorfile.log','r') as f:
if f.read()://if anything is in the error log from TaskA
//error occurred
do stuff
else:
do other stuff
来自docs:
Luigi has a built-in event system that allows you to register callbacks to events and trigger them from your own tasks. You can both hook into some pre-defined events and create your own. Each event handle is tied to a Task class and will be triggered only from that class or a subclass of it. This allows you to effortlessly subscribe to events only from a specific class (e.g. for hadoop jobs).
示例:
import luigi
from my_tasks import MyTask
@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
"""Will be called directly after a failed execution
of `run` on any MyTask subclass
"""
do_something()
luigi.run()
Luigi lot of events you can choose from. You can also take a look at this tests 是为了学习如何倾听其他事件并做出反应。
我正在使用 Luigi 启动一些管道。 举个简单的例子
task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()
现在假设 myTask
在执行期间引发异常。我所能拥有的只是 luigi 显示异常的日志。
luigi 有什么方法可以传播它或至少 return 一个 failure
状态吗?
然后我就可以让我的程序根据该状态做出反应。
谢谢。
编辑 当我存储结果时,我忘记指定 luigi 的输出以数据库为目标。如果引发异常,则不会存储任何结果,但不会将异常传播出 luigi。我想知道 luigi 是否可以选择这个。
您可以做的是将错误写入文件。例如,在您可能会失败的任务中(我们称之为 TaskA):
x=""
try:
do stuff
except:
x="error!"
with open('errorfile.log','w') as f:
f.write(x)
然后,在依赖于该错误的任务中,该任务将需要 TaskA。你可以这样做:
with open('errorfile.log','r') as f:
if f.read()://if anything is in the error log from TaskA
//error occurred
do stuff
else:
do other stuff
来自docs:
Luigi has a built-in event system that allows you to register callbacks to events and trigger them from your own tasks. You can both hook into some pre-defined events and create your own. Each event handle is tied to a Task class and will be triggered only from that class or a subclass of it. This allows you to effortlessly subscribe to events only from a specific class (e.g. for hadoop jobs).
示例:
import luigi
from my_tasks import MyTask
@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
"""Will be called directly after a failed execution
of `run` on any MyTask subclass
"""
do_something()
luigi.run()
Luigi lot of events you can choose from. You can also take a look at this tests 是为了学习如何倾听其他事件并做出反应。