合并 Google 数据流中的 PCollections
Merge PCollections in Google Dataflow
我是 Google Dataflow 的新手,正在使用目录数据,我想在其中加入不同语言的目录。现在,我的流程自己解析这两种语言,并为每种语言创建一个 PCollection。此 PCollection 包含一个元素,该元素包含元数据、所有文章和所有类别。该模型已准备好容纳多种语言,我现在需要做的就是合并这 2 个元素(PCollections)。
但是.... Google 数据流文档没有提供有关如何合并这些集合的示例 "by hand" 并且应用上的输入始终需要特定的输入类型。
我尝试了 PCollectionList 但找不到可以处理所有包含 PCollection 及其实体的解决方案。然后我写了一个 Custom CombineFn 但我不知道如何应用它,所以两个 Collections 都合并了。
所以我有 2 个 PCollection catalogCollection_de 和 catalogCollection_fr。我如何处理这些以自定义加入它们并获得单个 PCollection?
非常感谢任何帮助
编辑:关于 "merge/join" 在此上下文中的含义的问题。
PCollections 对象包含一个名为 CatalogCollection 的 Pojo,其中包含元数据、类别和文章。这些 Pojos 中的每一个都有不同的文本,其中每个文本由 Map<LanguageKey, Description>
表示。对于每种语言,都有一个保存此 CatalogCollection 的 PCollection。当我想 join/merge 它们时,我想将所有语言收集在一个 PCollection 中,这意味着,例如,对于每个 Article,我需要找到相应的另一种语言的 Article,并将所有描述放入第一个集合,最后应该收集所有语言。
您要查找的转换是 Flatten:
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = pcs.apply(Flatten.<String>pCollections());
另请注意:我注意到您说 "this collection contains one element" 这表明您也可能 运行 遇到问题,因为 Dataflow 将元素分布到多个工作线程以进行并行处理。只有元素,您可能无法获得预期的并行性优势。
对于我的用例,似乎要走的路是使用带有 sideinput 的 ParDo 并手动加入 PoJos(由于我的数据结构)。还有一个更新的管道,其中 PCollections 被分解为文章/类别等。我不得不自定义合并它们。
现在的问题是,从 Datastore 读取将不起作用,但那是在另一个线程中。
我是 Google Dataflow 的新手,正在使用目录数据,我想在其中加入不同语言的目录。现在,我的流程自己解析这两种语言,并为每种语言创建一个 PCollection。此 PCollection 包含一个元素,该元素包含元数据、所有文章和所有类别。该模型已准备好容纳多种语言,我现在需要做的就是合并这 2 个元素(PCollections)。
但是.... Google 数据流文档没有提供有关如何合并这些集合的示例 "by hand" 并且应用上的输入始终需要特定的输入类型。 我尝试了 PCollectionList 但找不到可以处理所有包含 PCollection 及其实体的解决方案。然后我写了一个 Custom CombineFn 但我不知道如何应用它,所以两个 Collections 都合并了。
所以我有 2 个 PCollection catalogCollection_de 和 catalogCollection_fr。我如何处理这些以自定义加入它们并获得单个 PCollection?
非常感谢任何帮助
编辑:关于 "merge/join" 在此上下文中的含义的问题。
PCollections 对象包含一个名为 CatalogCollection 的 Pojo,其中包含元数据、类别和文章。这些 Pojos 中的每一个都有不同的文本,其中每个文本由 Map<LanguageKey, Description>
表示。对于每种语言,都有一个保存此 CatalogCollection 的 PCollection。当我想 join/merge 它们时,我想将所有语言收集在一个 PCollection 中,这意味着,例如,对于每个 Article,我需要找到相应的另一种语言的 Article,并将所有描述放入第一个集合,最后应该收集所有语言。
您要查找的转换是 Flatten:
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = pcs.apply(Flatten.<String>pCollections());
另请注意:我注意到您说 "this collection contains one element" 这表明您也可能 运行 遇到问题,因为 Dataflow 将元素分布到多个工作线程以进行并行处理。只有元素,您可能无法获得预期的并行性优势。
对于我的用例,似乎要走的路是使用带有 sideinput 的 ParDo 并手动加入 PoJos(由于我的数据结构)。还有一个更新的管道,其中 PCollections 被分解为文章/类别等。我不得不自定义合并它们。 现在的问题是,从 Datastore 读取将不起作用,但那是在另一个线程中。