用 luigi 任务替换 table 加载函数
Replacing a table load function with a luigi task
我有一个 python 函数,可以将数据从另外 2 个 table 加载到 sql 服务器 table。
def load_table(date1,date2):
strDate1 = date1.strftime('%m/%d/%Y')
strDate2 = date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
cnx1= connection string
self.curs=cnx1.cursor()
self.curs.execute(stmt)
self.curs.commit()
我正在尝试将此函数转换为 luigi 任务
按照文档尝试了以下方法:
class Datetask(luigi.Task):
def output(self):
return luigi.DateParameter()
class loading(luigi.Task):
def requires(self):
return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))}
def run(self):
date1 = dict['date1']
date2 = dict['date2']
strDate1 = date1.strftime('%m/%d/%Y')
strDate2 = date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
curs=cnx1.cursor()
curs.execute(stmt)
curs.commit()
curs.close()
当我尝试 运行 时,出现错误:
python -m luigi --module load_task loading --local-scheduler
DEBUG: Checking if loading() is complete
/usr/local/lib/python2.7/dist-packages/luigi/worker.py:305: UserWarning: Task loading() without outputs has no custom complete() method
is_complete = task.complete()
WARNING: Will not run loading() or any dependencies due to error in deps() method:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 697, in _add
deps = task.deps()
File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 572, in deps
return flatten(self._requires())
File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 544, in _requires
return flatten(self.requires()) # base impl
File "load_task.py", line 19, in requires
return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))}
NameError: global name 'DateTask' is not defined
我正在定义 DateTask 所以这个错误让我很困惑。
另外,做所有的任务都需要全部3个requires(),运行,输出?
此外,是否有必要始终将 output 写入文件?
在使用 luIgi 时是全新的,因此将不胜感激任何输入
我认为这样的方法会更好:
class LoadTask(luigi.Task):
date1 = luigi.DateParameter()
date2 = luigi.DateParameter()
def requires(self):
return None
def output(self):
return luigi.LocalTarget("{0}-{1}.txt".format(self.date1, self.date2))
def run(self):
strDate1 = self.date1.strftime('%m/%d/%Y')
strDate2 = self.date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
curs=cnx1.cursor()
curs.execute(stmt)
curs.commit()
curs.close()
with self.output().open('w') as out_file:
print >> out_file, strDate1, strDate2
调用方式:
luigi --module load_task LoadTask --date1 2017-01-01 --date2 2017-01-02 --local-scheduler
Also, do all the tasks need to have all 3 requires(),run,output?
是的。尽管有一些任务类型,例如 luigi.WrapperTask
不需要 output()
,如果您是链中的第一个任务,您可以从 requires()
return None
等
Also, is it necessary to always write the output to a file?
没有。例如,SQL Alchemy contrib 模块定义了一个 Target 子类,您可以将其用作数据库中的目标。
http://luigi.readthedocs.io/en/stable/api/luigi.contrib.sqla.html
我有一个 python 函数,可以将数据从另外 2 个 table 加载到 sql 服务器 table。
def load_table(date1,date2):
strDate1 = date1.strftime('%m/%d/%Y')
strDate2 = date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
cnx1= connection string
self.curs=cnx1.cursor()
self.curs.execute(stmt)
self.curs.commit()
我正在尝试将此函数转换为 luigi 任务
按照文档尝试了以下方法:
class Datetask(luigi.Task):
def output(self):
return luigi.DateParameter()
class loading(luigi.Task):
def requires(self):
return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))}
def run(self):
date1 = dict['date1']
date2 = dict['date2']
strDate1 = date1.strftime('%m/%d/%Y')
strDate2 = date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
curs=cnx1.cursor()
curs.execute(stmt)
curs.commit()
curs.close()
当我尝试 运行 时,出现错误:
python -m luigi --module load_task loading --local-scheduler
DEBUG: Checking if loading() is complete
/usr/local/lib/python2.7/dist-packages/luigi/worker.py:305: UserWarning: Task loading() without outputs has no custom complete() method
is_complete = task.complete()
WARNING: Will not run loading() or any dependencies due to error in deps() method:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 697, in _add
deps = task.deps()
File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 572, in deps
return flatten(self._requires())
File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 544, in _requires
return flatten(self.requires()) # base impl
File "load_task.py", line 19, in requires
return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))}
NameError: global name 'DateTask' is not defined
我正在定义 DateTask 所以这个错误让我很困惑。
另外,做所有的任务都需要全部3个requires(),运行,输出?
此外,是否有必要始终将 output 写入文件? 在使用 luIgi 时是全新的,因此将不胜感激任何输入
我认为这样的方法会更好:
class LoadTask(luigi.Task):
date1 = luigi.DateParameter()
date2 = luigi.DateParameter()
def requires(self):
return None
def output(self):
return luigi.LocalTarget("{0}-{1}.txt".format(self.date1, self.date2))
def run(self):
strDate1 = self.date1.strftime('%m/%d/%Y')
strDate2 = self.date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
curs=cnx1.cursor()
curs.execute(stmt)
curs.commit()
curs.close()
with self.output().open('w') as out_file:
print >> out_file, strDate1, strDate2
调用方式:
luigi --module load_task LoadTask --date1 2017-01-01 --date2 2017-01-02 --local-scheduler
Also, do all the tasks need to have all 3 requires(),run,output?
是的。尽管有一些任务类型,例如 luigi.WrapperTask
不需要 output()
,如果您是链中的第一个任务,您可以从 requires()
return None
等
Also, is it necessary to always write the output to a file?
没有。例如,SQL Alchemy contrib 模块定义了一个 Target 子类,您可以将其用作数据库中的目标。 http://luigi.readthedocs.io/en/stable/api/luigi.contrib.sqla.html