google 云数据流中的并行和批量处理数据存储元素

Parallel and bulk process datastore elements in google cloud dataflow

问题:

我的数据存储项目中有超过 200 万用户数据的列表。我想向所有用户发送每周时事通讯。邮寄 API 每个 API 电话最多接受 50 个电子邮件地址。

上一个解法:

使用应用引擎后端和一个简单的数据存储区查询一次性处理所有记录。但是发生的事情是,有时我会收到内存溢出严重错误日志,并且该过程会重新开始。因此,一些用户不止一次收到相同的电子邮件。所以我转向了数据流。

当前解:

我使用 FlatMap 函数将每个电子邮件 ID 发送到一个函数,然后分别向每个用户发送电子邮件。

def process_datastore(project, pipeline_options):
    p = beam.Pipeline(options=pipeline_options)
    query = make_query()
    entities = (p | 'read from datastore' >> ReadFromDatastore(project, query))
    entities | beam.FlatMap(lambda entity: sendMail([entity.properties.get('emailID', "")]))
    return p.run()

通过云数据流,我确保每个用户只收到一次邮件,也不会遗漏任何人。没有内存错误。

但是当前这个过程需要 7 个小时才能完成 运行。我曾尝试用 ParDo 替换 FlatMap,并假设 ParDo 会并行处理该过程。但即使那样也需要同样的时间。

问题:

  1. 如何将电子邮件 ID 打包成 50 个一组,以便有效地使用邮件 API 调用?

  2. 如何并行化流程,使所用时间少于一个小时?

您可以使用查询游标将您的用户分成 50 个批次,并在推送队列或 deferred 任务中进行实际的批处理(电子邮件发送)。这将是一个仅限 GAE 的解决方案,没有云数据流,恕我直言,要简单得多。

您可以在 中找到此类处理的示例(同时考虑到答案)。该解决方案使用 deferred 库,但改用推送队列任务几乎是微不足道的。

答案涉及并行性方面,您可能希望限制并行性以降低成本。

您还可以将批处理本身拆分到任务中以获得无限可扩展的解决方案(任意数量的收件人,不会触及内存或超过截止日期失败),任务重新排队以从它所在的位置继续工作离开了。