Beam/Dataflow 基于数据库查询丰富文档的设计模式
Beam/Dataflow design pattern to enrich documents based on database queries
正在评估 Dataflow,我正在尝试找出 if/how 来执行以下操作。
我很抱歉,如果上面的任何内容都是微不足道的——在我们决定使用 Beam 或其他类似 Spark 等之前,尝试先了解一下 Dataflow。
机器学习的一般用例:
正在摄取单独处理的文档。
除了易于编写的转换之外,我们还想根据对数据库(主要是键值存储)的查询来丰富每个文档。
地名词典就是一个简单的例子:将文本分解为 ngram,然后检查这些 ngram 是否驻留在某个数据库中,并记录(在原始文档的转换版本中)实体标识符给定的短语映射到。
如何高效地做到这一点?
NAIVE(尽管序列化要求可能很棘手?):
每个文档都可以简单地单独查询数据库(类似于 ),但是,考虑到其中大部分都是简单的键值存储,似乎应该有更有效的方法来做到这一点(考虑到数据库查询延迟的实际问题)。
场景 #1:改进?:
当前的 strawman 是将表存储在 Bigquery 中,将它们拉下来 (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py),然后将它们用作辅助输入,用作 per-doc 函数中的键值查找).
键值表的范围从通常非常小到不太大(100 MB,可能是低 GB)。 Multiple CoGroupByKey with same key apache beam ("Side inputs can be arbitrarily large - there is no limit; we have seen pipelines successfully run using side inputs of 1+TB in size") 表明这是合理的,至少从尺寸 POV 来看是这样。
1) 这有意义吗?这是这个场景的 "correct" 设计模式吗?
2) 如果这是一个好的设计模式...我该如何实际实现它?
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L53 显示将结果作为 AsList 提供给文档函数。
i) 据推测,对于上述用例,AsDict 在这里更合适?所以我可能需要先对 Bigquery 输出进行 运行 一些转换,以将其分离为键值元组;并确保密钥是唯一的;然后将其用作辅助输入。
ii) 然后我需要在函数中使用侧输入
我不清楚的地方:
对于这两者,如何操纵 Bigquery pull 的输出对我来说是模糊的。我将如何完成 (i)(假设有必要)?意思是,数据格式是什么样的(原始字节?字符串?有没有我可以研究的好例子?)
同样,如果 AsDict 是将其传递到 func 的正确方法,我可以只引用像通常在 python 中使用的字典那样的东西吗?例如,side_input.get('blah') ?
情景 #2:进一步改进? (对于特定情况):
- 上述场景——如果可以实现的话——看起来确实是高级连续远程调用(考虑到简单的键值查找),并且对我们的某些场景非常有帮助。但是,如果我采用像地名词典查找(如上)这样的场景……是否有更优化的解决方案?
比如,对于每个文档,将我们所有的 ngram 写为键,将值作为基础索引(文档中的 docid+索引),然后在这些 ngram 和我们的短语之间进行某种连接地名词典...然后进行另一组转换以恢复原始文档(现在带有新注释)。
也就是说,让 Beam 直接处理所有 joins/lookups?
理论上的优势是,Beam 执行此操作可能比针对每个文档循环遍历所有 ngram 并检查 ngram 是否在 side_input.[=15= 中要快得多]
其他关键问题:
3) 如果这是一个很好的做事方式,有没有什么技巧可以让它在流媒体场景中很好地工作?其他地方的文本表明,在批处理场景之外,侧输入缓存的工作效果更差。目前,我们专注于批处理,但流式处理将与提供实时预测相关。
4) 有任何与 Beam 相关的理由更喜欢 Java>Python 以上任何一项吗?我们有大量现有的 Python 代码可以迁移到 Dataflow,因此我们更喜欢 Python...但不确定上面的 Python 是否存在任何隐藏问题(例如,我注意到 Python 不支持某些功能或 I/O)。
编辑:稻草人?对于示例 ngram 查找场景(应该强烈概括为一般 K:V 查找)
- 短语 = 从 bigquery 获取
- Docs(由 docid 索引)(直接从文本或 protobufs 输入,例如)
- 转换:短语 -> (短语,实体)元组
- 转换:docs -> ngrams(短语、docid、坐标[在文档中])
- CoGroupByKey key=phrase: (phrase, entity, docid, coords)
- CoGroupByKey key=docid, group((phrase, entity, docid, coords), Docs)
- 然后我们可以使用(phrase, entity, docid, coords) 的集合和每个文档迭代地完成每个文档
关于您管道的场景:
- 天真的场景
你是对的,数据库的每个元素查询是不可取的。
如果您的键值存储能够通过重复使用打开的连接来支持低延迟查找,您可以定义一个全局连接,每个工作人员初始化一次而不是一次每捆。这应该是可以接受的,您的 k-v 存储支持对现有连接的高效查找。
- 改进场景
如果这不可行,那么 BQ 是保存和提取数据的好方法。
您绝对可以使用 AsDict
侧输入,只需转到 side_input[my_key]
或 side_input.get(my_key)
。
您的管道可能看起来像这样:
kv_query = "SELECT key, value FROM my:table.name"
p = beam.Pipeline()
documents_pcoll = p | ReadDocuments()
additional_data_pcoll = (p
| beam.io.BigQuerySource(query=kv_query)
# Make row a key-value tuple.
| 'format bq' >> beam.Map(lambda row: (row['key'], row['value'])))
enriched_docs = (documents_pcoll
| 'join' >> beam.Map(lambda doc, query: enrich_doc(doc, query[doc['key']]),
query=AsDict(additional_data_pcoll)))
不幸的是,这有一个缺点,那就是 Python 目前不支持任意大的边输入(它目前将所有 K-V 加载到一个 Python 字典中)。如果你的侧输入数据很大,那么你会想要避免这个选项。
注意这以后会改变,但我们不能确定ATM。
- 进一步完善
加入 两个数据集的另一种方法是使用CoGroupByKey
。文档和 K-V 附加数据的加载不应该改变,但是加入时,你会做这样的事情:
# Turn the documents into key-value tuples as well[
documents_kv_pcoll = (documents_pcoll
| 'format docs' >> beam.Map(lambda doc: (doc['key'], doc)))
enriched_docs = ({'docs': documents_kv_pcoll, 'additional_data': additional_data_pcoll}
| beam.CoGroupByKey()
| 'enrich' >> beam.Map(lambda x: enrich_doc(x['docs'][0], x['additional_data'][0]))
CoGroupByKey
将允许您在任一侧使用任意大的集合。
回答您的问题
如果您需要了解更多,可以查看 example of using BigQuery as a side input in the cookbook. As you can see there, the data comes parsed (I believe that it comes in their original data types, but it may come in string/unicode). Check the docs(或随时询问)。
目前,Python streaming 处于 alpha 阶段,不支持侧输入;但它确实支持 CoGroupByKey
等随机播放功能。您使用 CoGroupByKey
的流水线应该在流式传输中运行良好。
比 Python 更喜欢 Java 的一个原因是所有这些功能都适用于 Java(无限大小的侧输入,流式侧输入)。但似乎对于您的用例,Python 可能拥有您所需要的一切。
注意:代码片段是近似值,但您应该能够使用 DirectRunner
.
调试它们
如果您觉得有帮助,请随时要求澄清,或询问其他方面。
正在评估 Dataflow,我正在尝试找出 if/how 来执行以下操作。
我很抱歉,如果上面的任何内容都是微不足道的——在我们决定使用 Beam 或其他类似 Spark 等之前,尝试先了解一下 Dataflow。
机器学习的一般用例:
正在摄取单独处理的文档。
除了易于编写的转换之外,我们还想根据对数据库(主要是键值存储)的查询来丰富每个文档。
地名词典就是一个简单的例子:将文本分解为 ngram,然后检查这些 ngram 是否驻留在某个数据库中,并记录(在原始文档的转换版本中)实体标识符给定的短语映射到。
如何高效地做到这一点?
NAIVE(尽管序列化要求可能很棘手?):
每个文档都可以简单地单独查询数据库(类似于
场景 #1:改进?:
当前的 strawman 是将表存储在 Bigquery 中,将它们拉下来 (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py),然后将它们用作辅助输入,用作 per-doc 函数中的键值查找).
键值表的范围从通常非常小到不太大(100 MB,可能是低 GB)。 Multiple CoGroupByKey with same key apache beam ("Side inputs can be arbitrarily large - there is no limit; we have seen pipelines successfully run using side inputs of 1+TB in size") 表明这是合理的,至少从尺寸 POV 来看是这样。
1) 这有意义吗?这是这个场景的 "correct" 设计模式吗?
2) 如果这是一个好的设计模式...我该如何实际实现它?
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L53 显示将结果作为 AsList 提供给文档函数。
i) 据推测,对于上述用例,AsDict 在这里更合适?所以我可能需要先对 Bigquery 输出进行 运行 一些转换,以将其分离为键值元组;并确保密钥是唯一的;然后将其用作辅助输入。
ii) 然后我需要在函数中使用侧输入
我不清楚的地方:
对于这两者,如何操纵 Bigquery pull 的输出对我来说是模糊的。我将如何完成 (i)(假设有必要)?意思是,数据格式是什么样的(原始字节?字符串?有没有我可以研究的好例子?)
同样,如果 AsDict 是将其传递到 func 的正确方法,我可以只引用像通常在 python 中使用的字典那样的东西吗?例如,side_input.get('blah') ?
情景 #2:进一步改进? (对于特定情况):
- 上述场景——如果可以实现的话——看起来确实是高级连续远程调用(考虑到简单的键值查找),并且对我们的某些场景非常有帮助。但是,如果我采用像地名词典查找(如上)这样的场景……是否有更优化的解决方案?
比如,对于每个文档,将我们所有的 ngram 写为键,将值作为基础索引(文档中的 docid+索引),然后在这些 ngram 和我们的短语之间进行某种连接地名词典...然后进行另一组转换以恢复原始文档(现在带有新注释)。
也就是说,让 Beam 直接处理所有 joins/lookups?
理论上的优势是,Beam 执行此操作可能比针对每个文档循环遍历所有 ngram 并检查 ngram 是否在 side_input.[=15= 中要快得多]
其他关键问题:
3) 如果这是一个很好的做事方式,有没有什么技巧可以让它在流媒体场景中很好地工作?其他地方的文本表明,在批处理场景之外,侧输入缓存的工作效果更差。目前,我们专注于批处理,但流式处理将与提供实时预测相关。
4) 有任何与 Beam 相关的理由更喜欢 Java>Python 以上任何一项吗?我们有大量现有的 Python 代码可以迁移到 Dataflow,因此我们更喜欢 Python...但不确定上面的 Python 是否存在任何隐藏问题(例如,我注意到 Python 不支持某些功能或 I/O)。
编辑:稻草人?对于示例 ngram 查找场景(应该强烈概括为一般 K:V 查找)
- 短语 = 从 bigquery 获取
- Docs(由 docid 索引)(直接从文本或 protobufs 输入,例如)
- 转换:短语 -> (短语,实体)元组
- 转换:docs -> ngrams(短语、docid、坐标[在文档中])
- CoGroupByKey key=phrase: (phrase, entity, docid, coords)
- CoGroupByKey key=docid, group((phrase, entity, docid, coords), Docs)
- 然后我们可以使用(phrase, entity, docid, coords) 的集合和每个文档迭代地完成每个文档
关于您管道的场景:
- 天真的场景
你是对的,数据库的每个元素查询是不可取的。
如果您的键值存储能够通过重复使用打开的连接来支持低延迟查找,您可以定义一个全局连接,每个工作人员初始化一次而不是一次每捆。这应该是可以接受的,您的 k-v 存储支持对现有连接的高效查找。
- 改进场景
如果这不可行,那么 BQ 是保存和提取数据的好方法。
您绝对可以使用 AsDict
侧输入,只需转到 side_input[my_key]
或 side_input.get(my_key)
。
您的管道可能看起来像这样:
kv_query = "SELECT key, value FROM my:table.name"
p = beam.Pipeline()
documents_pcoll = p | ReadDocuments()
additional_data_pcoll = (p
| beam.io.BigQuerySource(query=kv_query)
# Make row a key-value tuple.
| 'format bq' >> beam.Map(lambda row: (row['key'], row['value'])))
enriched_docs = (documents_pcoll
| 'join' >> beam.Map(lambda doc, query: enrich_doc(doc, query[doc['key']]),
query=AsDict(additional_data_pcoll)))
不幸的是,这有一个缺点,那就是 Python 目前不支持任意大的边输入(它目前将所有 K-V 加载到一个 Python 字典中)。如果你的侧输入数据很大,那么你会想要避免这个选项。
注意这以后会改变,但我们不能确定ATM。
- 进一步完善
加入 两个数据集的另一种方法是使用CoGroupByKey
。文档和 K-V 附加数据的加载不应该改变,但是加入时,你会做这样的事情:
# Turn the documents into key-value tuples as well[
documents_kv_pcoll = (documents_pcoll
| 'format docs' >> beam.Map(lambda doc: (doc['key'], doc)))
enriched_docs = ({'docs': documents_kv_pcoll, 'additional_data': additional_data_pcoll}
| beam.CoGroupByKey()
| 'enrich' >> beam.Map(lambda x: enrich_doc(x['docs'][0], x['additional_data'][0]))
CoGroupByKey
将允许您在任一侧使用任意大的集合。
回答您的问题
如果您需要了解更多,可以查看 example of using BigQuery as a side input in the cookbook. As you can see there, the data comes parsed (I believe that it comes in their original data types, but it may come in string/unicode). Check the docs(或随时询问)。
目前,Python streaming 处于 alpha 阶段,不支持侧输入;但它确实支持
CoGroupByKey
等随机播放功能。您使用CoGroupByKey
的流水线应该在流式传输中运行良好。比 Python 更喜欢 Java 的一个原因是所有这些功能都适用于 Java(无限大小的侧输入,流式侧输入)。但似乎对于您的用例,Python 可能拥有您所需要的一切。
注意:代码片段是近似值,但您应该能够使用 DirectRunner
.
如果您觉得有帮助,请随时要求澄清,或询问其他方面。