luigi 上游任务应该 运行 一次为下游任务集创建输入

luigi upstream task should run once to create input for set of downstream tasks

我有一个很好的直工作管道,我 运行 通过命令行上的 luigi 执行的任务会触发所有必需的上游数据获取并按正确的顺序进行处理,直到它流入我的数据库。

class IMAP_Fetch(luigi.Task):
  """fetch a bunch of email messages with data in them"""
  date = luigi.DateParameter()
  uid = luigi.Parameter()
…
  def output(self):
    loc = os.path.join(self.data_drop, str(self.date))
    # target for requested message
    yield LocalTarget(os.path.join(loc, uid+".msg"))

  def run(self):
     # code to connect to IMAP server and run FETCH on given UID
     # message gets written to self.output()
…

class RecordData(luigi.contrib.postgres.CopyToTable):
  """copy the data in one email message to the database table"""
  uid = luigi.Parameter()
  date = luigi.DateParameter()
  table = 'msg_data'
  columns = [(id, int), …]

  def requires(self):
    # a task (not shown) that extracts data from one message  
    # which in turn requires the IMAP_Fetch to pull down the message
    return MsgData(self.date, self.uid) 

  def rows(self):
    # code to read self.input() and yield lists of data values 

很棒的东西。不幸的是,第一次数据获取与远程 IMAP 服务器通信,每次获取都是一个新连接和一个新查询:非常慢。我知道如何在一个会话(任务实例)中获取所有单独的消息文件。我不明白如何让下游任务保持原样,一次处理一条消息,因为需要一条消息的任务只会触发一条消息的获取,而不是所有可用消息的获取。对于缺少明显的解决方案,我提前表示歉意,但到目前为止,如何让我漂亮的简单愚蠢的管道保持原样,但让顶部的漏斗在一次调用中吸收所有数据,这让我很困惑。感谢您的帮助。

我在您的解释中缺少的是发送到 RecordData 任务的 uid 值列表的来源。对于此解释,我假设您有一组 uid 个值,您希望将其合并到一个 ImapFetch 请求中。

一种可能的方法是定义一个 batch_id,此外还有您的 uid,其中 batch_id 指的是您希望在单个会话中获取的消息组。 . uidbatch_id 之间的关联存储在何处由您决定。它可以是传递给管道的参数,也可以存储在外部。您遗漏的任务,MsgData,其 requires 方法 returns 单个 ImapFetch 任务目前具有 uid 参数的任务,应该需要一个ImapFetch 采用 batch_id 参数的任务。 MsgData 任务所需的第一个 ImapFetch 任务将检索与该 batch_id 关联的所有 uid 值,然后在单个会话中检索这些消息。所有其他 MsgData 任务都需要(并等待)这批 ImapFetch 完成,然后它们都可以像管道的其余部分一样执行各自的消息。因此,调整批量大小可能对整体处理吞吐量很重要。

另一个缺点是它在批处理级别而不是单个项目级别的原子性较低,因为如果只有一个 uid 值未成功,则批处理 ImapFetch 将失败检索到。

第二种方法是将 Imap 会话作为每个进程(工作者)的更多单例资源打开,并让 ImapFetch 任务重用同一会话。

你也可以使用这样的计数器。

class WrapperTask(luigi.WrapperTask):
      counter = 0 
      def requires(self):
         if self.counter == 0:  # check if this is the first time the long DB process is called
         ## do your process here. This executes only once and is skipped next time due to the counter
         self.counter = self.counter + 1
         return OtherTask(parameters)