使用最新的 python apache_beam cloud datafow sdk 创建自定义源以从云数据存储中读取

Creating custom source for reading from cloud datastore using latest python apache_beam cloud datafow sdk

最近云数据流python sdk 可用,我决定使用它。不幸的是,从云数据存储中读取的支持尚未到来,所以我不得不回过头来编写自定义源代码,以便我可以像承诺的那样利用动态拆分、进度估计等的好处。我确实彻底研究了文档,但无法将各个部分放在一起,以便加快整个过程。

为了更清楚,我的第一个方法是:

  1. 正在查询云数据存储
  2. 正在创建 ParDo 函数并将返回的查询传递给它。

但是用这个迭代 20 万个条目需要 13 分钟。

所以我决定编写可以有效读取实体的自定义源代码。但是由于我缺乏对将各个部分组合在一起的理解,我无法实现这一目标。任何人都可以帮助我如何创建自定义源以从数据存储中读取。

已编辑: 对于第一种方法,link 我的要点是: https://gist.github.com/shriyanka/cbf30bbfbf277deed4bac0c526cf01f1

谢谢。

在您提供的代码中,对 Datastore 的访问发生在管道构建之前:

query = client.query(kind='User').fetch()

这会在 Beam SDK 完全参与之前执行整个查询并读取所有实体。

更准确地说,fetch() returns 是对查询结果的惰性迭代,它们 在构建管道时 beam.Create(query) - 但是,这再次发生在您的主程序中,在管道启动之前。最有可能的是,这需要 13 分钟,而不是管道本身(但请随时提供作业 ID,以便我们进行更深入的了解)。您可以通过对代码做一些小改动来验证这一点:

query = list(client.query(kind='User').fetch())

但是,我认为您的意图是同时读取处理实体。

特别是对于 Cloud Datastore,自定义源 API 不是执行此操作的最佳选择。原因是底层 Cloud Datastore API 本身目前不提供实现自定义源 "goodies" 所需的属性,例如进度估计和动态拆分,因为它的查询 API 非常通用(与 Cloud Bigtable 不同,它总是 returns 按键排序的结果,因此例如,您可以通过查看当前键来估计进度)。

我们目前正在重写 Java 云数据存储连接器以使用不同的方法,该方法使用 ParDo 拆分查询并使用 ParDo 读取每个子-查询。详情请见this pull request