如何使用 Luigi 动态检查输出
How to check output dynamically with Luigi
我意识到我可能需要使用动态需求来完成以下任务,但是我无法完全理解这在实践中会是什么样子。
目标是使用 Luigi 生成数据并将其添加到数据库中,而无需提前知道将生成什么数据。
以mongodb为例:
import luigi
from uuid import uuid4
from luigi.contrib import mongodb
import pymongo
# Make up IDs, though in practice the IDs may be generated from an API
class MakeID(luigi.Task):
def run(self):
with self.output().open('w') as f:
f.write(','.join([str(uuid4()) for e in range(10)]))
# Write the data to file
def output(self):
return luigi.LocalTarget('data.csv')
class ToDataBase(luigi.Task):
def requires(self):
return MakeID()
def run(self):
with self.input().open('r') as f:
ids = f.read().split(',')
# Add some fake data to simulate generating new data
count_data = {key: value for value, key in enumerate(ids)}
# Add data to the database
self.output().write(count_data)
def output(self):
# Attempt to read non-existent file to get the IDs to check if task is complete
with self.input().open('r') as f:
valid_ids = f.read().split(',')
client = pymongo.MongoClient('localhost',
27017,
ssl=False)
return mongodb.MongoRangeTarget(client,
'myDB',
'myData',
valid_ids,
'myField')
if __name__ == '__main__':
luigi.run()
目标是获取数据,对其进行修改,然后将其添加到数据库中。
以上代码在运行时失败,因为ToDataBase
运行的output
方法在require
方法之前,所以while函数可以访问到输入,输入还不存在。无论如何,我仍然需要检查以确保数据已添加到数据库中。
这个 github issue 接近我正在寻找的东西,尽管正如我提到的,我在实践中无法弄清楚这个用例的动态需求。
解决方案是创建第三个任务(在示例 Dynamic
中),它生成等待动态输入的任务,并使依赖项成为参数而不是 requires
方法。
class ToDatabase(luigi.Task):
fp = luigi.Parameter()
def output(self):
with open(self.fp, 'r') as f:
valid_ids = [str(e) for e in f.read().split(',')]
client = pymongo.MongoClient('localhost', 27017, ssl=False)
return mongodb.MongoRangeTarget(client, 'myDB', 'myData',
valid_ids, 'myField')
def run(self):
with open(self.fp, 'r') as f:
valid_ids = [str(e) for e in f.read().split(',')]
self.output().write({k: 5 for k in valid_ids})
class Dynamic(luigi.Task):
def output(self):
return self.input()
def requires(self):
return MakeIDs()
def run(self):
yield(AddToDatabase(fp=self.input().path))
我意识到我可能需要使用动态需求来完成以下任务,但是我无法完全理解这在实践中会是什么样子。
目标是使用 Luigi 生成数据并将其添加到数据库中,而无需提前知道将生成什么数据。
以mongodb为例:
import luigi
from uuid import uuid4
from luigi.contrib import mongodb
import pymongo
# Make up IDs, though in practice the IDs may be generated from an API
class MakeID(luigi.Task):
def run(self):
with self.output().open('w') as f:
f.write(','.join([str(uuid4()) for e in range(10)]))
# Write the data to file
def output(self):
return luigi.LocalTarget('data.csv')
class ToDataBase(luigi.Task):
def requires(self):
return MakeID()
def run(self):
with self.input().open('r') as f:
ids = f.read().split(',')
# Add some fake data to simulate generating new data
count_data = {key: value for value, key in enumerate(ids)}
# Add data to the database
self.output().write(count_data)
def output(self):
# Attempt to read non-existent file to get the IDs to check if task is complete
with self.input().open('r') as f:
valid_ids = f.read().split(',')
client = pymongo.MongoClient('localhost',
27017,
ssl=False)
return mongodb.MongoRangeTarget(client,
'myDB',
'myData',
valid_ids,
'myField')
if __name__ == '__main__':
luigi.run()
目标是获取数据,对其进行修改,然后将其添加到数据库中。
以上代码在运行时失败,因为ToDataBase
运行的output
方法在require
方法之前,所以while函数可以访问到输入,输入还不存在。无论如何,我仍然需要检查以确保数据已添加到数据库中。
这个 github issue 接近我正在寻找的东西,尽管正如我提到的,我在实践中无法弄清楚这个用例的动态需求。
解决方案是创建第三个任务(在示例 Dynamic
中),它生成等待动态输入的任务,并使依赖项成为参数而不是 requires
方法。
class ToDatabase(luigi.Task):
fp = luigi.Parameter()
def output(self):
with open(self.fp, 'r') as f:
valid_ids = [str(e) for e in f.read().split(',')]
client = pymongo.MongoClient('localhost', 27017, ssl=False)
return mongodb.MongoRangeTarget(client, 'myDB', 'myData',
valid_ids, 'myField')
def run(self):
with open(self.fp, 'r') as f:
valid_ids = [str(e) for e in f.read().split(',')]
self.output().write({k: 5 for k in valid_ids})
class Dynamic(luigi.Task):
def output(self):
return self.input()
def requires(self):
return MakeIDs()
def run(self):
yield(AddToDatabase(fp=self.input().path))