Beam/Dataflow 基于数据库查询丰富文档的设计模式

Beam/Dataflow design pattern to enrich documents based on database queries

正在评估 Dataflow,我正在尝试找出 if/how 来执行以下操作。

我很抱歉,如果上面的任何内容都是微不足道的——在我们决定使用 Beam 或其他类似 Spark 等之前,尝试先了解一下 Dataflow。

机器学习的一般用例:

如何高效地做到这一点?

NAIVE(尽管序列化要求可能很棘手?):

每个文档都可以简单地单独查询数据库(类似于 ),但是,考虑到其中大部分都是简单的键值存储,似乎应该有更有效的方法来做到这一点(考虑到数据库查询延迟的实际问题)。

场景 #1:改进?:

当前的 strawman 是将表存储在 Bigquery 中,将它们拉下来 (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py),然后将它们用作辅助输入,用作 per-doc 函数中的键值查找).

键值表的范围从通常非常小到不太大(100 MB,可能是低 GB)。 Multiple CoGroupByKey with same key apache beam ("Side inputs can be arbitrarily large - there is no limit; we have seen pipelines successfully run using side inputs of 1+TB in size") 表明这是合理的,至少从尺寸 POV 来看是这样。

1) 这有意义吗?这是这个场景的 "correct" 设计模式吗?

2) 如果这是一个好的设计模式...我该如何实际实现它?

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L53 显示将结果作为 AsList 提供给文档函数。

i) 据推测,对于上述用例,AsDict 在这里更合适?所以我可能需要先对 Bigquery 输出进行 运行 一些转换,以将其分离为键值元组;并确保密钥是唯一的;然后将其用作辅助输入。

ii) 然后我需要在函数中使用侧输入

我不清楚的地方:

情景 #2:进一步改进? (对于特定情况):

比如,对于每个文档,将我们所有的 ngram 写为键,将值作为基础索引(文档中的 docid+索引),然后在这些 ngram 和我们的短语之间进行某种连接地名词典...然后进行另一组转换以恢复原始文档(现在带有新注释)。

也就是说,让 Beam 直接处理所有 joins/lookups?

理论上的优势是,Beam 执行此操作可能比针对每个文档循环遍历所有 ngram 并检查 ngram 是否在 side_input.[=15= 中要快得多]

其他关键问题:

3) 如果这是一个很好的做事方式,有没有什么技巧可以让它在流媒体场景中很好地工作?其他地方的文本表明,在批处理场景之外,侧输入缓存的工作效果更差。目前,我们专注于批处理,但流式处理将与提供实时预测相关。

4) 有任何与 Beam 相关的理由更喜欢 Java>Python 以上任何一项吗?我们有大量现有的 Python 代码可以迁移到 Dataflow,因此我们更喜欢 Python...但不确定上面的 Python 是否存在任何隐藏问题(例如,我注意到 Python 不支持某些功能或 I/O)。

编辑:稻草人?对于示例 ngram 查找场景(应该强烈概括为一般 K:V 查找)

关于您管道的场景:

  1. 天真的场景

你是对的,数据库的每个元素查询是不可取的。

如果您的键值存储能够通过重复使用打开的连接来支持低延迟查找,您可以定义一个全局连接,每个工作人员初始化一次而不是一次每捆。这应该是可以接受的,您的 k-v 存储支持对现有连接的高效查找。

  1. 改进场景

如果这不可行,那么 BQ 是保存和提取数据的好方法。

您绝对可以使用 AsDict 侧输入,只需转到 side_input[my_key]side_input.get(my_key)

您的管道可能看起来像这样:

kv_query = "SELECT key, value FROM my:table.name"
p = beam.Pipeline()
documents_pcoll = p | ReadDocuments()
additional_data_pcoll = (p 
                   | beam.io.BigQuerySource(query=kv_query)
                   # Make row a key-value tuple.
                   | 'format bq' >> beam.Map(lambda row: (row['key'], row['value'])))

enriched_docs = (documents_pcoll 
                 | 'join' >> beam.Map(lambda doc, query: enrich_doc(doc, query[doc['key']]), 
                                      query=AsDict(additional_data_pcoll)))

不幸的是,这有一个缺点,那就是 Python 目前不支持任意大的边输入(它目前将所有 K-V 加载到一个 Python 字典中)。如果你的侧输入数据很大,那么你会想要避免这个选项。

注意这以后会改变,但我们不能确定ATM。

  1. 进一步完善

加入 两个数据集的另一种方法是使用CoGroupByKey。文档和 K-V 附加数据的加载不应该改变,但是加入时,你会做这样的事情:

# Turn the documents into key-value tuples as well[
documents_kv_pcoll = (documents_pcoll 
                      | 'format docs' >> beam.Map(lambda doc: (doc['key'], doc)))
enriched_docs = ({'docs': documents_kv_pcoll, 'additional_data': additional_data_pcoll}
                 | beam.CoGroupByKey()
                 | 'enrich' >> beam.Map(lambda x: enrich_doc(x['docs'][0], x['additional_data'][0]))

CoGroupByKey 将允许您在任一侧使用任意大的集合。

回答您的问题

  1. 如果您需要了解更多,可以查看 example of using BigQuery as a side input in the cookbook. As you can see there, the data comes parsed (I believe that it comes in their original data types, but it may come in string/unicode). Check the docs(或随时询问)。

  2. 目前,Python streaming 处于 alpha 阶段,不支持侧输入;但它确实支持 CoGroupByKey 等随机播放功能。您使用 CoGroupByKey 的流水线应该在流式传输中运行良好。

  3. 比 Python 更喜欢 Java 的一个原因是所有这些功能都适用于 Java(无限大小的侧输入,流式侧输入)。但似乎对于您的用例,Python 可能拥有您所需要的一切。

注意:代码片段是近似值,但您应该能够使用 DirectRunner.

调试它们

如果您觉得有帮助,请随时要求澄清,或询问其他方面。