路易吉 "tasks not granted run permission by the scheduler"
Luigi "tasks not granted run permission by the scheduler"
我正在尝试动态创建 Luigi 任务(基于 cmdList 中的属性)并使前一个任务成为下一个任务的依赖项。 SQLTask 是 Luigi.Task 的子类。但是,当我 运行 这段代码时,我得到 This progress looks :| because there were tasks that were not granted run permission by the scheduler
我错过了什么?
class BDX_Task(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter(default=None)
queryKey = luigi.Parameter()
queryCmd = luigi.Parameter()
runDesc = luigi.Parameter()
dependQry = luigi.Parameter()
def run(self):
print(subprocess.call(self.queryCmd, shell=True))
self.get_target().touch()
def dep_s_dep(cmdList, dep1):
"""
This returns dependency task's dependency
"""
dep2 = [(key,cmd,dep) for key, cmd, dep in cmdList if key==dep1]
return dep2[0]
class BDX_Query_0XX(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter()
runDesc = luigi.Parameter()
depend_task = ""
def run(self):
YY = self.acctDate[:4]
MM = self.acctDate[4:6]
acctDate = self.acctDate
ssisDate = self.ssisDate
runDesc = self.runDesc
bdx_sql = r'r:\1.SQL\BDX_SQL\'
cmdList = [
('BDX010',f'{bdx_sql}BDX_001_NI_DM 010.sql -o output010.txt',None),
('BDX020',f'{bdx_sql}BDX_001_NI_DM 020.sql -o output020.txt','BDX010'),
('BDX022a',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql -o output022a.txt','BDX020'),
('BDX022b',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql -o output022b.txt -v Year1={YY} MM={MM}','BDX022a'),
('BDX022c',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql -o output022c.txt -v Year={YY} Month={MM}', 'BDX022b'),
('BDX023',f'{bdx_sql}BDX_023_P031_MTD_All_Final_CatAdj.sql -o output023.txt ','BDX020'),
('BDX024',f'{bdx_sql}BDX_024_P031_ITD_All_Final_CatAdj.sql -o output024.txt','BDX020'),
('BDX025a',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 010.sql -o output025a.txt','BDX020'),
('BDX025b',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 020.sql -o output025b.txt -v Year={YY} Month={MM}','BDX025a'),
('BDX025c',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 030.sql -o output025c.txt -v YYMM={acctDate}','BDX025b')
]
tasks = []
for queryKey, queryCmd, dependQry in cmdList:
class_name = queryKey
klass = type(queryKey, (BDX_Task,),{}) # {'acctDate': self.acctDate, 'queryKey': queryKey, 'queryCmd': queryCmd, 'runDesc': self.runDesc, 'dependQry': dependQry})
if dependQry != '':
dep1 = dep_s_dep(cmdList, dependQry) # info about dependency task (key, cmd, dep's dep)
print(f"{queryKey}'s dep1", dep1)
depend_task = [globals()[dependQry](acctDate=self.acctDate,
ssisDate=self.ssisDate,
queryKey=dep1[0],
queryCmd=dep1[1],
runDesc=self.runDesc,
dependQry=dep1[2])]
def requires1(cls):
return depend_task
setattr(klass, "requires", classmethod(requires1))
globals()[queryKey] = klass # make the class available at the module level
tasks.append(globals()[queryKey](acctDate=self.acctDate, ssisDate =self.ssisDate, queryKey = queryKey, queryCmd = queryCmd, runDesc = self.runDesc, dependQry = dependQry)) # this addes Task class named after queryKey to dependency
yield tasks
self.get_target().touch()
===========堆栈跟踪
C:\ProgramData\Anaconda3\python.exe R:/1.PY/DataPipeLine/run_BDX_process.py BDX_Query_Main --local-scheduler
DEBUG: Checking if BDX_Query_Main(acctDate=201904, ssisDate=201905) is complete
DEBUG: Checking if BDX_Query_9XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1) is complete
INFO: Informed scheduler that task BDX_Query_Main_201904_201905_444c47aebc has status PENDING
DEBUG: Checking if BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1) is complete
BDX020's dep1 ('BDX010', '"r:\1.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 ', '')
BDX022a's dep1 ('BDX020', '"r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX022b's dep1 ('BDX022a', '"r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 ', 'BDX020')
BDX022c's dep1 ('BDX022b', '"r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year1=2019 MM=04', 'BDX022a')
BDX023's dep1 ('BDX020', '"r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX024's dep1 ('BDX020', '"r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX025a's dep1 ('BDX020', '"r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX025b's dep1 ('BDX025a', '"r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 ', 'BDX020')
BDX025c's dep1 ('BDX025b', '"r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04', 'BDX025a')
INFO: Informed scheduler that task BDX_Query_9XX_201904_201904_Luigi_tes_201905_db874019d2 has status PENDING
INFO: Informed scheduler that task BDX_Query_0XX_201904_201904_Luigi_tes_201905_db874019d2 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 6820] Worker Worker(salt=931855678, workers=1, host=LWVPWEACT001, username=i805649, pid=6820) running BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
INFO: [pid 6820] Worker Worker(salt=931855678, workers=1, host=LWVPWEACT001, username=i805649, pid=6820) new requirements BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Checking if BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=) is complete
INFO: Informed scheduler that task BDX010_201904___r__1_SQL_BDX_SQ_c7c8473ba5 has status DONE
DEBUG: Checking if BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" , runDesc=201904 Luigi test1, dependQry=BDX010) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX020_201904_BDX010__r__1_SQL_BDX_SQ_573b857d50 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX022a_201904_BDX020__r__1_SQL_BDX_SQ_7a4a9cc485 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX022b(acctDate=201904, ssisDate=201905, queryKey=BDX022b, queryCmd="r:.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year1=2019 MM=04, runDesc=201904 Luigi test1, dependQry=BDX022a) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX022b_201904_BDX022a__r__1_SQL_BDX_SQ_313dc66c50 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql" -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX022b) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX022c_201904_BDX022b__r__1_SQL_BDX_SQ_d198713a82 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX023(acctDate=201904, ssisDate=201905, queryKey=BDX023, queryCmd="r:.SQL\BDX_SQL\BDX_023_P031_MTD_All_Final_CatAdj.sql" , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX023_201904_BDX020__r__1_SQL_BDX_SQ_236e57639e has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX024(acctDate=201904, ssisDate=201905, queryKey=BDX024, queryCmd="r:.SQL\BDX_SQL\BDX_024_P031_ITD_All_Final_CatAdj.sql" , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX024_201904_BDX020__r__1_SQL_BDX_SQ_1a8ad5a673 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX025a(acctDate=201904, ssisDate=201905, queryKey=BDX025a, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX025a_201904_BDX020__r__1_SQL_BDX_SQ_91bd598abf has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX025c(acctDate=201904, ssisDate=201905, queryKey=BDX025c, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 030.sql" -v YYMM=201904, runDesc=201904 Luigi test1, dependQry=BDX025b) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX025c_201904_BDX025b__r__1_SQL_BDX_SQ_c98f5f14c3 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
INFO: Informed scheduler that task BDX_Query_0XX_201904_201904_Luigi_tes_201905_db874019d2 has status PENDING
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 12 pending tasks possibly being run by other workers
DEBUG: There are 12 pending tasks unique to this worker
DEBUG: There are 12 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=931855678, workers=1, host=LWVPWEACT001, username=i805649, pid=6820) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 14 tasks of which:
* 1 complete ones were encountered:
- 1 BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=)
* 13 were left pending, among these:
* 1 were missing external dependencies:
- 1 BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
* 2 had missing dependencies:
- 1 BDX_Query_9XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
- 1 BDX_Query_Main(acctDate=201904, ssisDate=201905)
* 10 was not granted run permission by the scheduler:
- 1 BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" , runDesc=201904 Luigi test1, dependQry=BDX010)
- 1 BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=BDX020)
- 1 BDX022b(...)
- 1 BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql" -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX022b)
- 1 BDX023(acctDate=201904, ssisDate=201905, queryKey=BDX023, queryCmd="r:.SQL\BDX_SQL\BDX_023_P031_MTD_All_Final_CatAdj.sql" , runDesc=201904 Luigi test1, dependQry=BDX020)
...
Did not run any tasks
This progress looks :| because there were tasks that were not granted run permission by the scheduler
===== Luigi Execution Summary =====
Process finished with exit code 0
我猜是:
Parameter "task_process_context" with value "None" is not of type string.
显示您的某些任务没有检索到预期的输出。
在这些情况下,luigi 将任务视为未完成。
确保所有任务return定义的类型(在以下任务中)作为输入。
在您的情况下,None
正在破坏流水线执行,请尝试确保这些任务 return str
以防它们 fail/have 没有数据到 return,而不是None
.
使用空字符串 ""
或用于保存空依赖项的关键字字符串:"empty"
我认为你把事情弄得比需要的更复杂了。首先,当直到 运行 时间才知道完整的任务列表时,动态依赖关系很有用。例如,您可能遇到这样一种情况,您必须 运行 一个查询数据库的任务,并且对于查询返回的每一行,您都需要一个新的依赖项。
这与以编程方式创建一组任务及其依赖项的用例截然不同,这就是您在示例中所做的感觉。
以下玩具代码显示了如何实现您想要做的事情:
import luigi
import datetime
import logging
logger = logging.getLogger('luigi-interface')
task_list = {
'taskA': ['taskA_command', ''],
'taskB': ['taskB command', 'taskA'],
'taskC': ['taskC command', 'taskA'],
'taskD': ['taskD command', 'taskB'],
}
// Equivalent of your BDX_Task class
class MyTask(luigi.Task):
task_date = luigi.DateParameter()
task_name = luigi.Parameter()
task_command = luigi.Parameter()
dependent_task_name = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(MyTask, self).__init__(*args, **kwargs)
logger.debug('MyTask.__init__ called for task_name="{}"'.format(self.task_name))
def output(self):
filename = 'output_files/{date:%Y%m%d}/{name}.output'.format(date=self.task_date,name=self.task_name)
return luigi.LocalTarget(filename)
def requires(self):
if self.dependent_task_name != '' and self.dependent_task_name in task_list:
dependent_task_command, next_dependent_task_name = task_list[self.dependent_task_name]
return [self.__class__(
task_date=self.task_date,
task_name=self.dependent_task_name,
task_command=dependent_task_command,
dependent_task_name=next_dependent_task_name,
)]
else:
return []
def run(self):
with self.output().open('w') as handle:
handle.write('Command to run: "{cmd}"'.format(cmd=self.task_command))
// Equivalent of your BDX_Query_0XX class
class myWrapperTask(luigi.WrapperTask):
task_date = luigi.DateParameter(default=datetime.date.today())
def requires(self):
for task_name, (task_command, dep_name) in task_list.items():
yield MyTask(
task_date=self.task_date,
task_name=task_name,
task_command=task_command,
dependent_task_name=dep_name,
)
已生成输出
$ PYTHON_PATH=.:$PYTHON_PATH && python -m luigi --module dynamic_dependencies myWrapperTask --local-scheduler --log-level INFO
MyTask.__init__ called for task_name="taskA"
MyTask.__init__ called for task_name="taskB"
MyTask.__init__ called for task_name="taskC"
MyTask.__init__ called for task_name="taskD"
INFO: Informed scheduler that task myWrapperTask_2019_04_12_c2195ac5bd has status PENDING
INFO: Informed scheduler that task MyTask_taskB_taskD_command_2019_04_12_6afc23b3fe has status PENDING
INFO: Informed scheduler that task MyTask_taskA_taskC_command_2019_04_12_35a27fe401 has status PENDING
INFO: Informed scheduler that task MyTask_taskA_taskB_command_2019_04_12_d66dc54b89 has status PENDING
INFO: Informed scheduler that task MyTask__taskA_command_2019_04_12_bdb7812ab5 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskA, task_command=taskA_command, dependent_task_name=)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskA, task_command=taskA_command, dependent_task_name=)
INFO: Informed scheduler that task MyTask__taskA_command_2019_04_12_bdb7812ab5 has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskB, task_command=taskB command, dependent_task_name=taskA)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskB, task_command=taskB command, dependent_task_name=taskA)
INFO: Informed scheduler that task MyTask_taskA_taskB_command_2019_04_12_d66dc54b89 has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskD, task_command=taskD command, dependent_task_name=taskB)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskD, task_command=taskD command, dependent_task_name=taskB)
INFO: Informed scheduler that task MyTask_taskB_taskD_command_2019_04_12_6afc23b3fe has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskC, task_command=taskC command, dependent_task_name=taskA)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskC, task_command=taskC command, dependent_task_name=taskA)
INFO: Informed scheduler that task MyTask_taskA_taskC_command_2019_04_12_35a27fe401 has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running myWrapperTask(task_date=2019-04-12)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done myWrapperTask(task_date=2019-04-12)
INFO: Informed scheduler that task myWrapperTask_2019_04_12_c2195ac5bd has status DONE
INFO: Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 5 ran successfully:
- 4 MyTask(task_date=2019-04-12, task_name=taskA, task_command=taskA_command, dependent_task_name=) ...
- 1 myWrapperTask(task_date=2019-04-12)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
与您的代码的主要结构差异是我的 task_list
是在任务 类 之外定义的。您可能为 SO 简化了一些事情,并且您的 cmdList
实际上将成为另一个任务的输出并且不能在 类 之外定义。您可以通过在生成列表后将列表添加到 globals()
来解决这个问题,或者您可以将完整的命令列表作为参数传递给 MyTask
/BDX_Task
,以便可以在 [=17] 中引用它=](如果列表可能很大,这可能不是最好的主意)。此外,正如您最初所做的那样,您不能像我的示例那样使用 luigi.WrapperTask
。
我正在尝试动态创建 Luigi 任务(基于 cmdList 中的属性)并使前一个任务成为下一个任务的依赖项。 SQLTask 是 Luigi.Task 的子类。但是,当我 运行 这段代码时,我得到 This progress looks :| because there were tasks that were not granted run permission by the scheduler
我错过了什么?
class BDX_Task(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter(default=None)
queryKey = luigi.Parameter()
queryCmd = luigi.Parameter()
runDesc = luigi.Parameter()
dependQry = luigi.Parameter()
def run(self):
print(subprocess.call(self.queryCmd, shell=True))
self.get_target().touch()
def dep_s_dep(cmdList, dep1):
"""
This returns dependency task's dependency
"""
dep2 = [(key,cmd,dep) for key, cmd, dep in cmdList if key==dep1]
return dep2[0]
class BDX_Query_0XX(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter()
runDesc = luigi.Parameter()
depend_task = ""
def run(self):
YY = self.acctDate[:4]
MM = self.acctDate[4:6]
acctDate = self.acctDate
ssisDate = self.ssisDate
runDesc = self.runDesc
bdx_sql = r'r:\1.SQL\BDX_SQL\'
cmdList = [
('BDX010',f'{bdx_sql}BDX_001_NI_DM 010.sql -o output010.txt',None),
('BDX020',f'{bdx_sql}BDX_001_NI_DM 020.sql -o output020.txt','BDX010'),
('BDX022a',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql -o output022a.txt','BDX020'),
('BDX022b',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql -o output022b.txt -v Year1={YY} MM={MM}','BDX022a'),
('BDX022c',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql -o output022c.txt -v Year={YY} Month={MM}', 'BDX022b'),
('BDX023',f'{bdx_sql}BDX_023_P031_MTD_All_Final_CatAdj.sql -o output023.txt ','BDX020'),
('BDX024',f'{bdx_sql}BDX_024_P031_ITD_All_Final_CatAdj.sql -o output024.txt','BDX020'),
('BDX025a',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 010.sql -o output025a.txt','BDX020'),
('BDX025b',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 020.sql -o output025b.txt -v Year={YY} Month={MM}','BDX025a'),
('BDX025c',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 030.sql -o output025c.txt -v YYMM={acctDate}','BDX025b')
]
tasks = []
for queryKey, queryCmd, dependQry in cmdList:
class_name = queryKey
klass = type(queryKey, (BDX_Task,),{}) # {'acctDate': self.acctDate, 'queryKey': queryKey, 'queryCmd': queryCmd, 'runDesc': self.runDesc, 'dependQry': dependQry})
if dependQry != '':
dep1 = dep_s_dep(cmdList, dependQry) # info about dependency task (key, cmd, dep's dep)
print(f"{queryKey}'s dep1", dep1)
depend_task = [globals()[dependQry](acctDate=self.acctDate,
ssisDate=self.ssisDate,
queryKey=dep1[0],
queryCmd=dep1[1],
runDesc=self.runDesc,
dependQry=dep1[2])]
def requires1(cls):
return depend_task
setattr(klass, "requires", classmethod(requires1))
globals()[queryKey] = klass # make the class available at the module level
tasks.append(globals()[queryKey](acctDate=self.acctDate, ssisDate =self.ssisDate, queryKey = queryKey, queryCmd = queryCmd, runDesc = self.runDesc, dependQry = dependQry)) # this addes Task class named after queryKey to dependency
yield tasks
self.get_target().touch()
===========堆栈跟踪
C:\ProgramData\Anaconda3\python.exe R:/1.PY/DataPipeLine/run_BDX_process.py BDX_Query_Main --local-scheduler
DEBUG: Checking if BDX_Query_Main(acctDate=201904, ssisDate=201905) is complete
DEBUG: Checking if BDX_Query_9XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1) is complete
INFO: Informed scheduler that task BDX_Query_Main_201904_201905_444c47aebc has status PENDING
DEBUG: Checking if BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1) is complete
BDX020's dep1 ('BDX010', '"r:\1.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 ', '')
BDX022a's dep1 ('BDX020', '"r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX022b's dep1 ('BDX022a', '"r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 ', 'BDX020')
BDX022c's dep1 ('BDX022b', '"r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year1=2019 MM=04', 'BDX022a')
BDX023's dep1 ('BDX020', '"r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX024's dep1 ('BDX020', '"r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX025a's dep1 ('BDX020', '"r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX025b's dep1 ('BDX025a', '"r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 ', 'BDX020')
BDX025c's dep1 ('BDX025b', '"r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04', 'BDX025a')
INFO: Informed scheduler that task BDX_Query_9XX_201904_201904_Luigi_tes_201905_db874019d2 has status PENDING
INFO: Informed scheduler that task BDX_Query_0XX_201904_201904_Luigi_tes_201905_db874019d2 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 6820] Worker Worker(salt=931855678, workers=1, host=LWVPWEACT001, username=i805649, pid=6820) running BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
INFO: [pid 6820] Worker Worker(salt=931855678, workers=1, host=LWVPWEACT001, username=i805649, pid=6820) new requirements BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Checking if BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=) is complete
INFO: Informed scheduler that task BDX010_201904___r__1_SQL_BDX_SQ_c7c8473ba5 has status DONE
DEBUG: Checking if BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" , runDesc=201904 Luigi test1, dependQry=BDX010) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX020_201904_BDX010__r__1_SQL_BDX_SQ_573b857d50 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX022a_201904_BDX020__r__1_SQL_BDX_SQ_7a4a9cc485 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX022b(acctDate=201904, ssisDate=201905, queryKey=BDX022b, queryCmd="r:.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year1=2019 MM=04, runDesc=201904 Luigi test1, dependQry=BDX022a) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX022b_201904_BDX022a__r__1_SQL_BDX_SQ_313dc66c50 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql" -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX022b) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX022c_201904_BDX022b__r__1_SQL_BDX_SQ_d198713a82 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX023(acctDate=201904, ssisDate=201905, queryKey=BDX023, queryCmd="r:.SQL\BDX_SQL\BDX_023_P031_MTD_All_Final_CatAdj.sql" , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX023_201904_BDX020__r__1_SQL_BDX_SQ_236e57639e has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX024(acctDate=201904, ssisDate=201905, queryKey=BDX024, queryCmd="r:.SQL\BDX_SQL\BDX_024_P031_ITD_All_Final_CatAdj.sql" , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX024_201904_BDX020__r__1_SQL_BDX_SQ_1a8ad5a673 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX025a(acctDate=201904, ssisDate=201905, queryKey=BDX025a, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX025a_201904_BDX020__r__1_SQL_BDX_SQ_91bd598abf has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX025c(acctDate=201904, ssisDate=201905, queryKey=BDX025c, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 030.sql" -v YYMM=201904, runDesc=201904 Luigi test1, dependQry=BDX025b) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX025c_201904_BDX025b__r__1_SQL_BDX_SQ_c98f5f14c3 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
INFO: Informed scheduler that task BDX_Query_0XX_201904_201904_Luigi_tes_201905_db874019d2 has status PENDING
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 12 pending tasks possibly being run by other workers
DEBUG: There are 12 pending tasks unique to this worker
DEBUG: There are 12 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=931855678, workers=1, host=LWVPWEACT001, username=i805649, pid=6820) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 14 tasks of which:
* 1 complete ones were encountered:
- 1 BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=)
* 13 were left pending, among these:
* 1 were missing external dependencies:
- 1 BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
* 2 had missing dependencies:
- 1 BDX_Query_9XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
- 1 BDX_Query_Main(acctDate=201904, ssisDate=201905)
* 10 was not granted run permission by the scheduler:
- 1 BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" , runDesc=201904 Luigi test1, dependQry=BDX010)
- 1 BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=BDX020)
- 1 BDX022b(...)
- 1 BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql" -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX022b)
- 1 BDX023(acctDate=201904, ssisDate=201905, queryKey=BDX023, queryCmd="r:.SQL\BDX_SQL\BDX_023_P031_MTD_All_Final_CatAdj.sql" , runDesc=201904 Luigi test1, dependQry=BDX020)
...
Did not run any tasks
This progress looks :| because there were tasks that were not granted run permission by the scheduler
===== Luigi Execution Summary =====
Process finished with exit code 0
我猜是:
Parameter "task_process_context" with value "None" is not of type string.
显示您的某些任务没有检索到预期的输出。
在这些情况下,luigi 将任务视为未完成。
确保所有任务return定义的类型(在以下任务中)作为输入。
在您的情况下,None
正在破坏流水线执行,请尝试确保这些任务 return str
以防它们 fail/have 没有数据到 return,而不是None
.
使用空字符串 ""
或用于保存空依赖项的关键字字符串:"empty"
我认为你把事情弄得比需要的更复杂了。首先,当直到 运行 时间才知道完整的任务列表时,动态依赖关系很有用。例如,您可能遇到这样一种情况,您必须 运行 一个查询数据库的任务,并且对于查询返回的每一行,您都需要一个新的依赖项。
这与以编程方式创建一组任务及其依赖项的用例截然不同,这就是您在示例中所做的感觉。
以下玩具代码显示了如何实现您想要做的事情:
import luigi
import datetime
import logging
logger = logging.getLogger('luigi-interface')
task_list = {
'taskA': ['taskA_command', ''],
'taskB': ['taskB command', 'taskA'],
'taskC': ['taskC command', 'taskA'],
'taskD': ['taskD command', 'taskB'],
}
// Equivalent of your BDX_Task class
class MyTask(luigi.Task):
task_date = luigi.DateParameter()
task_name = luigi.Parameter()
task_command = luigi.Parameter()
dependent_task_name = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(MyTask, self).__init__(*args, **kwargs)
logger.debug('MyTask.__init__ called for task_name="{}"'.format(self.task_name))
def output(self):
filename = 'output_files/{date:%Y%m%d}/{name}.output'.format(date=self.task_date,name=self.task_name)
return luigi.LocalTarget(filename)
def requires(self):
if self.dependent_task_name != '' and self.dependent_task_name in task_list:
dependent_task_command, next_dependent_task_name = task_list[self.dependent_task_name]
return [self.__class__(
task_date=self.task_date,
task_name=self.dependent_task_name,
task_command=dependent_task_command,
dependent_task_name=next_dependent_task_name,
)]
else:
return []
def run(self):
with self.output().open('w') as handle:
handle.write('Command to run: "{cmd}"'.format(cmd=self.task_command))
// Equivalent of your BDX_Query_0XX class
class myWrapperTask(luigi.WrapperTask):
task_date = luigi.DateParameter(default=datetime.date.today())
def requires(self):
for task_name, (task_command, dep_name) in task_list.items():
yield MyTask(
task_date=self.task_date,
task_name=task_name,
task_command=task_command,
dependent_task_name=dep_name,
)
已生成输出
$ PYTHON_PATH=.:$PYTHON_PATH && python -m luigi --module dynamic_dependencies myWrapperTask --local-scheduler --log-level INFO
MyTask.__init__ called for task_name="taskA"
MyTask.__init__ called for task_name="taskB"
MyTask.__init__ called for task_name="taskC"
MyTask.__init__ called for task_name="taskD"
INFO: Informed scheduler that task myWrapperTask_2019_04_12_c2195ac5bd has status PENDING
INFO: Informed scheduler that task MyTask_taskB_taskD_command_2019_04_12_6afc23b3fe has status PENDING
INFO: Informed scheduler that task MyTask_taskA_taskC_command_2019_04_12_35a27fe401 has status PENDING
INFO: Informed scheduler that task MyTask_taskA_taskB_command_2019_04_12_d66dc54b89 has status PENDING
INFO: Informed scheduler that task MyTask__taskA_command_2019_04_12_bdb7812ab5 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskA, task_command=taskA_command, dependent_task_name=)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskA, task_command=taskA_command, dependent_task_name=)
INFO: Informed scheduler that task MyTask__taskA_command_2019_04_12_bdb7812ab5 has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskB, task_command=taskB command, dependent_task_name=taskA)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskB, task_command=taskB command, dependent_task_name=taskA)
INFO: Informed scheduler that task MyTask_taskA_taskB_command_2019_04_12_d66dc54b89 has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskD, task_command=taskD command, dependent_task_name=taskB)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskD, task_command=taskD command, dependent_task_name=taskB)
INFO: Informed scheduler that task MyTask_taskB_taskD_command_2019_04_12_6afc23b3fe has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskC, task_command=taskC command, dependent_task_name=taskA)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskC, task_command=taskC command, dependent_task_name=taskA)
INFO: Informed scheduler that task MyTask_taskA_taskC_command_2019_04_12_35a27fe401 has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running myWrapperTask(task_date=2019-04-12)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done myWrapperTask(task_date=2019-04-12)
INFO: Informed scheduler that task myWrapperTask_2019_04_12_c2195ac5bd has status DONE
INFO: Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 5 ran successfully:
- 4 MyTask(task_date=2019-04-12, task_name=taskA, task_command=taskA_command, dependent_task_name=) ...
- 1 myWrapperTask(task_date=2019-04-12)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
与您的代码的主要结构差异是我的 task_list
是在任务 类 之外定义的。您可能为 SO 简化了一些事情,并且您的 cmdList
实际上将成为另一个任务的输出并且不能在 类 之外定义。您可以通过在生成列表后将列表添加到 globals()
来解决这个问题,或者您可以将完整的命令列表作为参数传递给 MyTask
/BDX_Task
,以便可以在 [=17] 中引用它=](如果列表可能很大,这可能不是最好的主意)。此外,正如您最初所做的那样,您不能像我的示例那样使用 luigi.WrapperTask
。