使用 luigi 更新 Postgres table

Using luigi to update Postgres table

我刚刚开始使用 luigi 库。我定期抓取网站并将任何新记录插入 Postgres 数据库。当我试图重写部分脚本以使用 luigi 时,我不清楚应该如何使用 "marker table"

工作流:

  1. 抓取数据
  2. 查询数据库以检查新数据是否与旧数据不同。
  3. 如果是,将新数据存储在同一个 table.

但是,使用 luigi 的 postgres.CopyToTable,如果 table 已经存在,则不会插入新数据。我想我应该使用 table_updates table 中的 inserted 列来确定应该插入哪些新数据,但我不清楚该过程是什么样的,我不能在线查找任何明确的示例。

你不必太担心标记 table:它是内部 table luigi 用来跟踪哪个任务已经成功执行的。为此,luigi 使用了您任务的 update_id 属性。如果你没有声明一个,那么 luigi 将使用 task_id as shown here。 task_id 是任务系列名称和任务的前三个参数的串联。

此处的关键是覆盖任务的 update_id 属性 和 return 自定义字符串,您将知道每个 运行 的自定义字符串都是唯一的你的任务。通常你应该使用任务的重要参数,比如:

@property
def update_id(self):
    return ":".join(self.param1, self.param2, self.param3)

显着 我的意思是改变任务输出的参数。我想象像网站 url o id 和抓取日期这样的参数。数据库的主机名、端口、用户名或密码等参数对于这些任务中的任何一个都是相同的,因此不应将它们视为重要参数。

请注意,如果没有关于您的 table 和您尝试保存的数据的详细信息,很难说明您必须如何构建 update_id 字符串,所以请小心。