python luigi : requires() 不能 return 目标对象
python luigi : requires() can not return Target objects
我是 Luigi 的新手,我想设置 luigi 来执行我的 API 调用。
我正在使用 MockFiles,因为我通过 API 检索的 json 对象很轻,我想避免使用外部数据库。
这是我的代码:
import luigi
from luigi import Task, run as runLuigi, mock as LuigiMock
import yaml
class getAllCountries(Task):
task_complete = False
def requires(self):
return LuigiMock.MockFile("allCountries")
def run(self):
sync = Sync()
# Get list of all countries
countries = sync.getAllCountries()
if(countries is None or len(countries) == 0):
Logger.error("Sync terminated. The country array is null")
object_to_send = yaml.dump(countries)
_out = self.output().open('r')
_out.write(object_to_send)
_out.close()
task_complete = True
def complete(self):
return self.task_complete
class getActiveCountries(Task):
task_complete = False
def requires(self):
return getAllCountries()
def run(self):
_in = self.input().read('r')
serialised = _in.read()
countries = yaml.load(serialised)
doSync = DoSync()
activeCountries = doSync.getActiveCountries(countries)
if(activeCountries is None or len(activeCountries) == 0):
Logger.error("Sync terminated. The active country account array is null")
task_complete = True
def complete(self):
return self.task_complete
if __name__ == "__main__":
runLuigi()
我是 运行 使用以下命令的项目:
PYTHONPATH='.' luigi --module app getActiveCountries --workers 2 --local-scheduler
它失败了,这是我得到的堆栈跟踪:
DEBUG: Checking if getActiveCountries() is complete
DEBUG: Checking if getAllCountries() is complete
INFO: Informed scheduler that task getActiveCountries__99914b932b has status PENDING
ERROR: Luigi unexpected framework error while scheduling getActiveCountries()
Traceback (most recent call last):
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 763, in add
for next in self._add(item, is_complete):
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 861, in _add
self._validate_dependency(d)
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 886, in _validate_dependency
raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class
INFO: Worker Worker(salt=797067816, workers=2, host=xxx, pid=85795) was stopped. Shutting down Keep-Alive thread
ERROR: Uncaught exception in luigi
Traceback (most recent call last):
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/retcodes.py", line 75, in run_with_retcodes
worker = luigi.interface._run(argv).worker
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/interface.py", line 211, in _run
return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/interface.py", line 171, in _schedule_and_run
success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 763, in add
for next in self._add(item, is_complete):
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 861, in _add
self._validate_dependency(d)
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 886, in _validate_dependency
raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class
此外,我是 运行 后台的 luigid,我没有看到任何 运行 的任务。无论是否失败
有什么想法吗?
首先,您没有看到 luigi 守护进程中发生任何事情,因为您在 PYTHONPATH 中指定了 --local-scheduler
。这完全忽略了守护进程,只是 运行 本地进程上的调度程序。
其次,在 getAllCountries
任务中,您将目标指定为要求,而它应该在您的 output
函数中。一旦你改变了 if from:
def requires(self):
return LuigiMock.MockFile("allCountries")
至:
def output(self):
return LuigiMock.MockFile("allCountries")
您不需要重新定义 complete
函数或将 task_complete
设置为 True
,因为 luigi 会根据输出文件的存在来确定任务是否完成。要了解有关目标的更多信息,请查看:https://luigi.readthedocs.io/en/stable/workflows.html#target
旁注:您可以制作此部分:
_out = self.output().open('r')
_out.write(object_to_send)
_out.close()
只需使用 Python 的 with
功能,就更容易出现错误。
with self.output().open('r') as _out:
_out.write(object_to_send)
Python 将在退出 with
作用域并出错时自动关闭文件。
第二个旁注:不要使用 luigi 的 运行。它已被弃用。请改用 luigi.build
。
我是 Luigi 的新手,我想设置 luigi 来执行我的 API 调用。
我正在使用 MockFiles,因为我通过 API 检索的 json 对象很轻,我想避免使用外部数据库。
这是我的代码:
import luigi
from luigi import Task, run as runLuigi, mock as LuigiMock
import yaml
class getAllCountries(Task):
task_complete = False
def requires(self):
return LuigiMock.MockFile("allCountries")
def run(self):
sync = Sync()
# Get list of all countries
countries = sync.getAllCountries()
if(countries is None or len(countries) == 0):
Logger.error("Sync terminated. The country array is null")
object_to_send = yaml.dump(countries)
_out = self.output().open('r')
_out.write(object_to_send)
_out.close()
task_complete = True
def complete(self):
return self.task_complete
class getActiveCountries(Task):
task_complete = False
def requires(self):
return getAllCountries()
def run(self):
_in = self.input().read('r')
serialised = _in.read()
countries = yaml.load(serialised)
doSync = DoSync()
activeCountries = doSync.getActiveCountries(countries)
if(activeCountries is None or len(activeCountries) == 0):
Logger.error("Sync terminated. The active country account array is null")
task_complete = True
def complete(self):
return self.task_complete
if __name__ == "__main__":
runLuigi()
我是 运行 使用以下命令的项目:
PYTHONPATH='.' luigi --module app getActiveCountries --workers 2 --local-scheduler
它失败了,这是我得到的堆栈跟踪:
DEBUG: Checking if getActiveCountries() is complete
DEBUG: Checking if getAllCountries() is complete
INFO: Informed scheduler that task getActiveCountries__99914b932b has status PENDING
ERROR: Luigi unexpected framework error while scheduling getActiveCountries()
Traceback (most recent call last):
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 763, in add
for next in self._add(item, is_complete):
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 861, in _add
self._validate_dependency(d)
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 886, in _validate_dependency
raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class
INFO: Worker Worker(salt=797067816, workers=2, host=xxx, pid=85795) was stopped. Shutting down Keep-Alive thread
ERROR: Uncaught exception in luigi
Traceback (most recent call last):
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/retcodes.py", line 75, in run_with_retcodes
worker = luigi.interface._run(argv).worker
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/interface.py", line 211, in _run
return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/interface.py", line 171, in _schedule_and_run
success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 763, in add
for next in self._add(item, is_complete):
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 861, in _add
self._validate_dependency(d)
File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 886, in _validate_dependency
raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class
此外,我是 运行 后台的 luigid,我没有看到任何 运行 的任务。无论是否失败
有什么想法吗?
首先,您没有看到 luigi 守护进程中发生任何事情,因为您在 PYTHONPATH 中指定了 --local-scheduler
。这完全忽略了守护进程,只是 运行 本地进程上的调度程序。
其次,在 getAllCountries
任务中,您将目标指定为要求,而它应该在您的 output
函数中。一旦你改变了 if from:
def requires(self):
return LuigiMock.MockFile("allCountries")
至:
def output(self):
return LuigiMock.MockFile("allCountries")
您不需要重新定义 complete
函数或将 task_complete
设置为 True
,因为 luigi 会根据输出文件的存在来确定任务是否完成。要了解有关目标的更多信息,请查看:https://luigi.readthedocs.io/en/stable/workflows.html#target
旁注:您可以制作此部分:
_out = self.output().open('r')
_out.write(object_to_send)
_out.close()
只需使用 Python 的 with
功能,就更容易出现错误。
with self.output().open('r') as _out:
_out.write(object_to_send)
Python 将在退出 with
作用域并出错时自动关闭文件。
第二个旁注:不要使用 luigi 的 运行。它已被弃用。请改用 luigi.build
。