在 Luigi 的任务之间传递 Python 个对象?
Passing Python objects between Tasks in Luigi?
我在 Python 3.6 中编写我的第一个项目,使用 Spotify's Luigi 在管道中安排一些自然语言处理任务。
我注意到 Task
class 的 output()
函数总是 returns 某种 Target
对象,它只是某处的某个文件,无论是本地还是远程。因为我的任务会产生更复杂的数据结构,如解析树,所以将它们作为字符串写入文件并在之后再次读取它们对我来说非常尴尬。
因此我想问一下是否有可能在管道中的任务之间传递 Python 对象?
简答:否。
Luigi 参数仅限于 date/datetime 个对象、字符串、整数和浮点数。参见 docs for reference。
这意味着您需要将复杂的数据结构序列化为字符串(使用 json、msgpack、任何您喜欢的序列化程序,甚至压缩它)并将其作为字符串参数传递。
当然,你可以写一个自定义的Parameter子类,但是你需要基本实现serialize and parse methods。
但请注意:如果您使用参数而不是将计算的数据保存到目标,您将失去使用 Luigi 的一个关键优势:如果树中的父任务失败次数超过重试次数指定,那么您将需要 运行 再次计算该复杂数据结构的任务。如果您的任务计算复杂数据或花费大量时间或消耗大量资源,那么您应该将输出保存为目标,以免再次进行所有昂贵的计算。
展望未来:另一个任务可能也需要该数据,那么为什么不保存它呢?
此外,请注意目标不仅是文件:您可以将数据保存到数据库 table、Redis、Hadoop、Elastic Search 索引等等:http://luigi.readthedocs.io/en/stable/api/luigi.contrib.html#submodules
还有其他 - 仍然有点老套 - 方法来实现你想用目标而不是参数做的事情。
luigi.mock
中有一个特殊的 MockFile 目标,允许您将其 "file" 存储在内存中。
它的 api 与其他继承 类 的 Target 相似,因此您必须 open
、read
和 write
。突然它只支持 string
输入,因此您仍然需要序列化您的对象(这是由于通过进程之间的管道发送此数据)。请参阅以下示例(yaml 序列化):
import yaml
from luigi import Task
class TaskA(Task):
def output(self):
return MockFile('whatever')
def run(self):
object_to_send = yaml.dump({"example": "dict"})
_out = self.output().open('r')
_out.write(object_to_send)
_out.close()
class TaskB(Task):
def requires(self):
return TaskA()
def run(self):
_in = self.input().read('r')
serialised = _in.read()
deserialised = yaml.load(serialised)
print(deserialised)
请注意,序列化大对象可能会花费很多时间。
我在 Python 3.6 中编写我的第一个项目,使用 Spotify's Luigi 在管道中安排一些自然语言处理任务。
我注意到 Task
class 的 output()
函数总是 returns 某种 Target
对象,它只是某处的某个文件,无论是本地还是远程。因为我的任务会产生更复杂的数据结构,如解析树,所以将它们作为字符串写入文件并在之后再次读取它们对我来说非常尴尬。
因此我想问一下是否有可能在管道中的任务之间传递 Python 对象?
简答:否。
Luigi 参数仅限于 date/datetime 个对象、字符串、整数和浮点数。参见 docs for reference。
这意味着您需要将复杂的数据结构序列化为字符串(使用 json、msgpack、任何您喜欢的序列化程序,甚至压缩它)并将其作为字符串参数传递。
当然,你可以写一个自定义的Parameter子类,但是你需要基本实现serialize and parse methods。
但请注意:如果您使用参数而不是将计算的数据保存到目标,您将失去使用 Luigi 的一个关键优势:如果树中的父任务失败次数超过重试次数指定,那么您将需要 运行 再次计算该复杂数据结构的任务。如果您的任务计算复杂数据或花费大量时间或消耗大量资源,那么您应该将输出保存为目标,以免再次进行所有昂贵的计算。
展望未来:另一个任务可能也需要该数据,那么为什么不保存它呢?
此外,请注意目标不仅是文件:您可以将数据保存到数据库 table、Redis、Hadoop、Elastic Search 索引等等:http://luigi.readthedocs.io/en/stable/api/luigi.contrib.html#submodules
还有其他 - 仍然有点老套 - 方法来实现你想用目标而不是参数做的事情。
luigi.mock
中有一个特殊的 MockFile 目标,允许您将其 "file" 存储在内存中。
它的 api 与其他继承 类 的 Target 相似,因此您必须 open
、read
和 write
。突然它只支持 string
输入,因此您仍然需要序列化您的对象(这是由于通过进程之间的管道发送此数据)。请参阅以下示例(yaml 序列化):
import yaml
from luigi import Task
class TaskA(Task):
def output(self):
return MockFile('whatever')
def run(self):
object_to_send = yaml.dump({"example": "dict"})
_out = self.output().open('r')
_out.write(object_to_send)
_out.close()
class TaskB(Task):
def requires(self):
return TaskA()
def run(self):
_in = self.input().read('r')
serialised = _in.read()
deserialised = yaml.load(serialised)
print(deserialised)
请注意,序列化大对象可能会花费很多时间。