尝试执行 luigi 任务时如何修复 "luigi.worker.TaskException: Can not schedule non-task <class '__main__.Task'>"?
How to fix "luigi.worker.TaskException: Can not schedule non-task <class '__main__.Task'>" when trying to execute luigi tasks?
我是 Luigi 的新手,我创建了一个管道,它从数据库中获取数据,转换数据,然后将其加载回数据库。我在其中创建了四个任务。但是,当我在 cmd 或 Pycharm 上执行任务时,它说它无法安排非任务。下面是我的管道的伪代码。
每个任务的参数不是输入而是从其他文件中获取的。
class Task1(luigi.Task):
# Some Parameters
def get_target():
def query():
def run():
class Task2(luigi.Task):
# Some Parameters
def requires():
return Task1()
def func1():
def func2():
def run()
class Task3(luigi.Task):
# Some Parameters
def requires():
return Task2()
def run():
class Task4(luigi.Task):
# Some Parameters
def requires():
return Task3()
def run():
在 Pycharm 上,我使用了
if __name__ == '__main__':
luigi.build([Task1, Task2, Task3, Task4], workers=5, local_scheduler=True)
在 cmd 上,我使用了
python .\folder\file.py Task1
但它给了我这个错误
INFO: Worker Worker was stopped. Shutting down Keep-Alive thread
Traceback (most recent call last):
File "D:/folder/file.py", line 300, in <module>
luigi.build([Task1, Task2, Task3, Task4], workers=5, local_scheduler=True)
File "C:\Users\Anaconda3\lib\site-packages\luigi\interface.py", line 237, in build
luigi_run_result = _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params)
File "C:\Users\Anaconda3\lib\site-packages\luigi\interface.py", line 171, in _schedule_and_run
success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
File "C:\Users\Anaconda3\lib\site-packages\luigi\worker.py", line 740, in add
self._validate_task(task)
File "C:\Users\Anaconda3\lib\site-packages\luigi\worker.py", line 638, in _validate_task
raise TaskException('Can not schedule non-task %s' % task)
luigi.worker.TaskException: Can not schedule non-task <class '__main__.Task1'>
首先,您需要为所有任务指定输出。没有输出,luigi 不知道任务何时完成。其次(这是你的实际问题所在)你没有实例化你正在传递的任务。你只需要创建实例,所以试试:
luigi.build([Task1(), Task2(), Task3(), Task4()], workers=5, local_scheduler=True)
但是,我觉得有必要指出另外两点:
1) 由于你的每个任务都指定了之前需要的任务,你只需要告诉 luigi 运行 链中的最后一个任务,所以:
luigi.build([Task4()], workers=5, local_scheduler=True)
这将告诉 luigi 它需要完成任务 4。要完成任务 4,luigi 将查看任务 4 需要什么,然后查看任务 3。然后它将查看任务 3 需要什么 运行 和请参阅任务 2 等。Luigi 将自动为您构建图表,并 运行 它们按照满足每个任务依赖项的顺序构建。
最后,您在 .build
中给 luigi 任务的顺序对整体影响不大。这是因为 luigi 不是根据你给它的顺序来确定顺序的,而是根据依赖关系图和 task priority.
编辑:如果你需要为另一个任务要求多个任务,你可以简单地做:
class Task4(luigi.Task):
def requires(self):
return [Task1(), Task2(), Task3()]
我是 Luigi 的新手,我创建了一个管道,它从数据库中获取数据,转换数据,然后将其加载回数据库。我在其中创建了四个任务。但是,当我在 cmd 或 Pycharm 上执行任务时,它说它无法安排非任务。下面是我的管道的伪代码。 每个任务的参数不是输入而是从其他文件中获取的。
class Task1(luigi.Task):
# Some Parameters
def get_target():
def query():
def run():
class Task2(luigi.Task):
# Some Parameters
def requires():
return Task1()
def func1():
def func2():
def run()
class Task3(luigi.Task):
# Some Parameters
def requires():
return Task2()
def run():
class Task4(luigi.Task):
# Some Parameters
def requires():
return Task3()
def run():
在 Pycharm 上,我使用了
if __name__ == '__main__':
luigi.build([Task1, Task2, Task3, Task4], workers=5, local_scheduler=True)
在 cmd 上,我使用了
python .\folder\file.py Task1
但它给了我这个错误
INFO: Worker Worker was stopped. Shutting down Keep-Alive thread
Traceback (most recent call last):
File "D:/folder/file.py", line 300, in <module>
luigi.build([Task1, Task2, Task3, Task4], workers=5, local_scheduler=True)
File "C:\Users\Anaconda3\lib\site-packages\luigi\interface.py", line 237, in build
luigi_run_result = _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params)
File "C:\Users\Anaconda3\lib\site-packages\luigi\interface.py", line 171, in _schedule_and_run
success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
File "C:\Users\Anaconda3\lib\site-packages\luigi\worker.py", line 740, in add
self._validate_task(task)
File "C:\Users\Anaconda3\lib\site-packages\luigi\worker.py", line 638, in _validate_task
raise TaskException('Can not schedule non-task %s' % task)
luigi.worker.TaskException: Can not schedule non-task <class '__main__.Task1'>
首先,您需要为所有任务指定输出。没有输出,luigi 不知道任务何时完成。其次(这是你的实际问题所在)你没有实例化你正在传递的任务。你只需要创建实例,所以试试:
luigi.build([Task1(), Task2(), Task3(), Task4()], workers=5, local_scheduler=True)
但是,我觉得有必要指出另外两点:
1) 由于你的每个任务都指定了之前需要的任务,你只需要告诉 luigi 运行 链中的最后一个任务,所以:
luigi.build([Task4()], workers=5, local_scheduler=True)
这将告诉 luigi 它需要完成任务 4。要完成任务 4,luigi 将查看任务 4 需要什么,然后查看任务 3。然后它将查看任务 3 需要什么 运行 和请参阅任务 2 等。Luigi 将自动为您构建图表,并 运行 它们按照满足每个任务依赖项的顺序构建。
最后,您在 .build
中给 luigi 任务的顺序对整体影响不大。这是因为 luigi 不是根据你给它的顺序来确定顺序的,而是根据依赖关系图和 task priority.
编辑:如果你需要为另一个任务要求多个任务,你可以简单地做:
class Task4(luigi.Task):
def requires(self):
return [Task1(), Task2(), Task3()]