luigi 上游任务应该 运行 一次为下游任务集创建输入
luigi upstream task should run once to create input for set of downstream tasks
我有一个很好的直工作管道,我 运行 通过命令行上的 luigi 执行的任务会触发所有必需的上游数据获取并按正确的顺序进行处理,直到它流入我的数据库。
class IMAP_Fetch(luigi.Task):
"""fetch a bunch of email messages with data in them"""
date = luigi.DateParameter()
uid = luigi.Parameter()
…
def output(self):
loc = os.path.join(self.data_drop, str(self.date))
# target for requested message
yield LocalTarget(os.path.join(loc, uid+".msg"))
def run(self):
# code to connect to IMAP server and run FETCH on given UID
# message gets written to self.output()
…
class RecordData(luigi.contrib.postgres.CopyToTable):
"""copy the data in one email message to the database table"""
uid = luigi.Parameter()
date = luigi.DateParameter()
table = 'msg_data'
columns = [(id, int), …]
def requires(self):
# a task (not shown) that extracts data from one message
# which in turn requires the IMAP_Fetch to pull down the message
return MsgData(self.date, self.uid)
def rows(self):
# code to read self.input() and yield lists of data values
很棒的东西。不幸的是,第一次数据获取与远程 IMAP 服务器通信,每次获取都是一个新连接和一个新查询:非常慢。我知道如何在一个会话(任务实例)中获取所有单独的消息文件。我不明白如何让下游任务保持原样,一次处理一条消息,因为需要一条消息的任务只会触发一条消息的获取,而不是所有可用消息的获取。对于缺少明显的解决方案,我提前表示歉意,但到目前为止,如何让我漂亮的简单愚蠢的管道保持原样,但让顶部的漏斗在一次调用中吸收所有数据,这让我很困惑。感谢您的帮助。
我在您的解释中缺少的是发送到 RecordData
任务的 uid
值列表的来源。对于此解释,我假设您有一组 uid
个值,您希望将其合并到一个 ImapFetch
请求中。
一种可能的方法是定义一个 batch_id
,此外还有您的 uid
,其中 batch_id
指的是您希望在单个会话中获取的消息组。 . uid
和 batch_id
之间的关联存储在何处由您决定。它可以是传递给管道的参数,也可以存储在外部。您遗漏的任务,MsgData
,其 requires
方法 returns 单个 ImapFetch
任务目前具有 uid
参数的任务,应该需要一个ImapFetch
采用 batch_id
参数的任务。 MsgData
任务所需的第一个 ImapFetch
任务将检索与该 batch_id
关联的所有 uid
值,然后在单个会话中检索这些消息。所有其他 MsgData
任务都需要(并等待)这批 ImapFetch
完成,然后它们都可以像管道的其余部分一样执行各自的消息。因此,调整批量大小可能对整体处理吞吐量很重要。
另一个缺点是它在批处理级别而不是单个项目级别的原子性较低,因为如果只有一个 uid
值未成功,则批处理 ImapFetch
将失败检索到。
第二种方法是将 Imap 会话作为每个进程(工作者)的更多单例资源打开,并让 ImapFetch 任务重用同一会话。
你也可以使用这样的计数器。
class WrapperTask(luigi.WrapperTask):
counter = 0
def requires(self):
if self.counter == 0: # check if this is the first time the long DB process is called
## do your process here. This executes only once and is skipped next time due to the counter
self.counter = self.counter + 1
return OtherTask(parameters)
我有一个很好的直工作管道,我 运行 通过命令行上的 luigi 执行的任务会触发所有必需的上游数据获取并按正确的顺序进行处理,直到它流入我的数据库。
class IMAP_Fetch(luigi.Task):
"""fetch a bunch of email messages with data in them"""
date = luigi.DateParameter()
uid = luigi.Parameter()
…
def output(self):
loc = os.path.join(self.data_drop, str(self.date))
# target for requested message
yield LocalTarget(os.path.join(loc, uid+".msg"))
def run(self):
# code to connect to IMAP server and run FETCH on given UID
# message gets written to self.output()
…
class RecordData(luigi.contrib.postgres.CopyToTable):
"""copy the data in one email message to the database table"""
uid = luigi.Parameter()
date = luigi.DateParameter()
table = 'msg_data'
columns = [(id, int), …]
def requires(self):
# a task (not shown) that extracts data from one message
# which in turn requires the IMAP_Fetch to pull down the message
return MsgData(self.date, self.uid)
def rows(self):
# code to read self.input() and yield lists of data values
很棒的东西。不幸的是,第一次数据获取与远程 IMAP 服务器通信,每次获取都是一个新连接和一个新查询:非常慢。我知道如何在一个会话(任务实例)中获取所有单独的消息文件。我不明白如何让下游任务保持原样,一次处理一条消息,因为需要一条消息的任务只会触发一条消息的获取,而不是所有可用消息的获取。对于缺少明显的解决方案,我提前表示歉意,但到目前为止,如何让我漂亮的简单愚蠢的管道保持原样,但让顶部的漏斗在一次调用中吸收所有数据,这让我很困惑。感谢您的帮助。
我在您的解释中缺少的是发送到 RecordData
任务的 uid
值列表的来源。对于此解释,我假设您有一组 uid
个值,您希望将其合并到一个 ImapFetch
请求中。
一种可能的方法是定义一个 batch_id
,此外还有您的 uid
,其中 batch_id
指的是您希望在单个会话中获取的消息组。 . uid
和 batch_id
之间的关联存储在何处由您决定。它可以是传递给管道的参数,也可以存储在外部。您遗漏的任务,MsgData
,其 requires
方法 returns 单个 ImapFetch
任务目前具有 uid
参数的任务,应该需要一个ImapFetch
采用 batch_id
参数的任务。 MsgData
任务所需的第一个 ImapFetch
任务将检索与该 batch_id
关联的所有 uid
值,然后在单个会话中检索这些消息。所有其他 MsgData
任务都需要(并等待)这批 ImapFetch
完成,然后它们都可以像管道的其余部分一样执行各自的消息。因此,调整批量大小可能对整体处理吞吐量很重要。
另一个缺点是它在批处理级别而不是单个项目级别的原子性较低,因为如果只有一个 uid
值未成功,则批处理 ImapFetch
将失败检索到。
第二种方法是将 Imap 会话作为每个进程(工作者)的更多单例资源打开,并让 ImapFetch 任务重用同一会话。
你也可以使用这样的计数器。
class WrapperTask(luigi.WrapperTask):
counter = 0
def requires(self):
if self.counter == 0: # check if this is the first time the long DB process is called
## do your process here. This executes only once and is skipped next time due to the counter
self.counter = self.counter + 1
return OtherTask(parameters)