Luigi 任务 returns 在 运行 完成依赖时未完成的依赖
Luigi task returns unfulfilled dependency at run time when dependency is complete
我对使用 Luigi 创建流程还比较陌生,我想了解为什么我的小工作流程会导致未实现的依赖关系。我正在尝试 运行 任务 StageProviders(),它有一个依赖项 ErrorsLogFile()。 StageProviders 之前必须 运行 的任务只是在共享驱动器上创建空白文件的任务。当我尝试 运行 以下流程中的 StageProviders 任务时,我收到以下消息,如下所示:
代码:
#!/usr/local/bin/python
import luigi
import os
import shutil
import time
import pandas as pd
import time
class DupsExistingLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return None
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_DuplicatesExisting.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_DuplicatesExisting.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class DupsLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return DupsExistingLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Duplicates.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Duplicates.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class ErrorsLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return DupsLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Errprs.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Errors.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class StageProviders(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return ErrorsLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS_STG_' + os.path.splitext(self.filename)[0] + '_' + timestr + '.txt'))
def run(self):
timestr = time.strftime("%Y-%m-%d")
filepath_str = '/root/etc/mnt/Import/' + self.filename
xls_file = pd.ExcelFile(filepath_str)
df = xls_file.parse('Sheet1')
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS.txt')
dest_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS_STG_' + os.path.splitext(self.filename)[0] + '_' + timestr + '.txt')
if not df.empty:
shutil.copyfile(src_blank_file_str, dest_file_str)
with self.output().open('w') as out_file:
for name in df['NP']:
print(name, end='\n', file=out_file)
输出:
root@ubuntu:~/pythonfiles/luigi_POC/cpi_luigi_poc/src# python3 -m luigi --module provider_import StageProviders --filename CCM_provider_sample.xlsx --
local-scheduler
DEBUG: Checking if StageProviders(filename=CCM_provider_sample.xlsx) is complete
DEBUG: Checking if ErrorsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsExistingLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Informed scheduler that task DupsExistingLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DupsExistingLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running DupsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done DupsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running ErrorsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done ErrorsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running StageProviders(filename=CCM_provider_sample.xlsx)
ERROR: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) failed StageProviders(filename=CCM_provider_sample.xlsx)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/luigi/worker.py", line 175, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status FAILED
DEBUG: Checking if StageProviders(filename=CCM_provider_sample.xlsx) is complete
DEBUG: Checking if ErrorsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running ErrorsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done ErrorsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running StageProviders(filename=CCM_provider_sample.xlsx)
ERROR: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) failed StageProviders(filename=CCM_provider_sample.xlsx)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/luigi/worker.py", line 175, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 3 ran successfully:
- 1 DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
- 1 DupsLogFile(filename=CCM_provider_sample.xlsx)
- 1 ErrorsLogFile(filename=CCM_provider_sample.xlsx)
* 1 failed:
- 1 StageProviders(filename=CCM_provider_sample.xlsx)
This progress looks :( because there were failed tasks
看来这是因为这条消息:
RuntimeError:运行 时间未完成的依赖项:ErrorsLogFile_CCM_provider_sam_ad65b206fd
但是,读取输出似乎 ErrorsLogFile_CCM_provider_sam_ad65b206fd 在 StageProviders 运行 之前完成?...为什么调度程序返回未完成的依赖项?我相信我误解了如何 "chain" 一起执行任务。我只是希望 StageProviders 任务在成功完成 ErrorsLogFile、DupsLogFile 和 DupsExistingLogFile 任务后 运行。
您的 ErrorLogs 输出中有一个拼写错误(遗憾的是没有复制到您的 运行 方法)
[...] + timestr + "_Errprs.xlsx"))
因此,任务 运行 正常并获得状态 DONE 但是当 StageProviders 检查其要求时,它调用 ErrorLogs 的完整方法 returns false 因为该文件不存在因此这个 "Unfulfilled dependency at run time" 错误。
这个错误一般表示任务状态在工作流执行过程中发生了变化。
我对使用 Luigi 创建流程还比较陌生,我想了解为什么我的小工作流程会导致未实现的依赖关系。我正在尝试 运行 任务 StageProviders(),它有一个依赖项 ErrorsLogFile()。 StageProviders 之前必须 运行 的任务只是在共享驱动器上创建空白文件的任务。当我尝试 运行 以下流程中的 StageProviders 任务时,我收到以下消息,如下所示:
代码:
#!/usr/local/bin/python
import luigi
import os
import shutil
import time
import pandas as pd
import time
class DupsExistingLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return None
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_DuplicatesExisting.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_DuplicatesExisting.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class DupsLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return DupsExistingLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Duplicates.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Duplicates.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class ErrorsLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return DupsLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Errprs.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Errors.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class StageProviders(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return ErrorsLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS_STG_' + os.path.splitext(self.filename)[0] + '_' + timestr + '.txt'))
def run(self):
timestr = time.strftime("%Y-%m-%d")
filepath_str = '/root/etc/mnt/Import/' + self.filename
xls_file = pd.ExcelFile(filepath_str)
df = xls_file.parse('Sheet1')
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS.txt')
dest_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS_STG_' + os.path.splitext(self.filename)[0] + '_' + timestr + '.txt')
if not df.empty:
shutil.copyfile(src_blank_file_str, dest_file_str)
with self.output().open('w') as out_file:
for name in df['NP']:
print(name, end='\n', file=out_file)
输出:
root@ubuntu:~/pythonfiles/luigi_POC/cpi_luigi_poc/src# python3 -m luigi --module provider_import StageProviders --filename CCM_provider_sample.xlsx --
local-scheduler
DEBUG: Checking if StageProviders(filename=CCM_provider_sample.xlsx) is complete
DEBUG: Checking if ErrorsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsExistingLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Informed scheduler that task DupsExistingLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DupsExistingLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running DupsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done DupsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running ErrorsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done ErrorsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running StageProviders(filename=CCM_provider_sample.xlsx)
ERROR: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) failed StageProviders(filename=CCM_provider_sample.xlsx)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/luigi/worker.py", line 175, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status FAILED
DEBUG: Checking if StageProviders(filename=CCM_provider_sample.xlsx) is complete
DEBUG: Checking if ErrorsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running ErrorsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done ErrorsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running StageProviders(filename=CCM_provider_sample.xlsx)
ERROR: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) failed StageProviders(filename=CCM_provider_sample.xlsx)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/luigi/worker.py", line 175, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 3 ran successfully:
- 1 DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
- 1 DupsLogFile(filename=CCM_provider_sample.xlsx)
- 1 ErrorsLogFile(filename=CCM_provider_sample.xlsx)
* 1 failed:
- 1 StageProviders(filename=CCM_provider_sample.xlsx)
This progress looks :( because there were failed tasks
看来这是因为这条消息:
RuntimeError:运行 时间未完成的依赖项:ErrorsLogFile_CCM_provider_sam_ad65b206fd
但是,读取输出似乎 ErrorsLogFile_CCM_provider_sam_ad65b206fd 在 StageProviders 运行 之前完成?...为什么调度程序返回未完成的依赖项?我相信我误解了如何 "chain" 一起执行任务。我只是希望 StageProviders 任务在成功完成 ErrorsLogFile、DupsLogFile 和 DupsExistingLogFile 任务后 运行。
您的 ErrorLogs 输出中有一个拼写错误(遗憾的是没有复制到您的 运行 方法)
[...] + timestr + "_Errprs.xlsx"))
因此,任务 运行 正常并获得状态 DONE 但是当 StageProviders 检查其要求时,它调用 ErrorLogs 的完整方法 returns false 因为该文件不存在因此这个 "Unfulfilled dependency at run time" 错误。
这个错误一般表示任务状态在工作流执行过程中发生了变化。