Apache Beam 中的嵌套前 N 个
Nested Top N in Apache Beam
我正在使用 Apache Beam 2.14 和 Java。
给定一个如下所示的数据集:
| countryID | sessionID | pageID | count |
| --------- | --------- | --------- | -------- |
| a | a | a | 1 |
| a | b | c | 2 |
| b | c | a | 4 |
| c | d | a | 6 |
我想 return 一个数据集,其中只有计数总和在前 N 个国家/地区 ID 中的行,对于每个国家/地区 ID,前 N 个会话,对于每个会话 ID,前 N 个页面 ID。
数据集的大小有数十亿行 - 内存放不下。
顺便说一句 - 数据集驻留在 BigQuery 中,并尝试使用 DENSE_RANK() 或 ROW_NUMBER() 函数错误直接在 BigQuery 中执行此操作,由于此大小而导致 "memory limit exceeded" 错误,因此尝试改用 Dataflow。
我目前的策略是:
- 按countryID、sessionID、pageID的组合键分组,求各组之和。
- 按 countryID、sessionID 对结果进行分组,并求出每组的总和。
- 按 countryID 对结果进行分组并求出每组的总和。
- 使用
Top.of
获取排名靠前的国家/地区 ID
- 将结果展平回到第 2 级分组并使用
Top.perKey
获得每个国家/地区的热门会话。
- 将结果展平到第 1 级分组并获取每个会话的顶部 pageID。
- 展平结果以发出行。
棘手的部分是需要在每个 "group by" 级别保留行,以便它们可以在最后发出。
我试图创建一个树结构,其中每个节点都包含 "group by" 步骤的结果 - 这样我就可以计算它的子节点的总和一次,以便在后续步骤中进行比较。
即在每个 "group by" 步骤,结果是 KV<String, Iterable<Node>>
,并且节点具有如下字段:
@DefaultCoder(SerializableCoder.class)
public static class TreeNode implements Node, Serializable {
private Long total = 0L;
private KV<String, Iterable<LeafNode>> kv;
...
虽然这似乎几乎适用于直接 运行ner 和一个小样本数据集,但当 运行 在数据流上时,我遇到与 Node
类 由于 Iterable
是输入 PCollection:
的 window
Caused by: java.io.NotSerializableException:
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable
考虑到我需要处理的数据集大小,将数据复制到内存中的不同集合中以进行序列化并不是一个可行的选择。
这是到目前为止的管道示例 - 仅使用 2 级分组作为示例:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
.apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA)
.apply("Set first level key", WithKeys.of(new GroupKey(key1)))
.apply("Group by", GroupByKey.create())
.apply("to leaf nodes", ParDo.of(new ToLeafNode()))
.apply("Set 2nd level key", WithKeys.of(new GroupKey2()))
.apply("Group by 2nd level", GroupByKey.create())
.apply("To tree nodes", ParDo.of(new ToTreeNode()))
.apply("Top N", Top.of(10, new CompareTreeNode<TreeNode>()))
.apply("Flatten", FlatMapElements.via(new FlattenNodes<TreeNode>()))
.apply("Expand", ParDo.of(new ExpandTreeNode()))
.apply("Top N of first key", Top.perKey(10, new CompareTreeNode<LeafNode>()))
.apply("Values", Values.create())
.apply("Flatten", FlatMapElements.via(new FlattenNodes<LeafNode>()))
.apply("Expand", ParDo.of(new ExpandLeafNode()))
.apply("Values", Values.create())
.apply("Write to bq",
BigQueryIO.<Row>write().to(outputTable).withSchema(BigQueryUtils.toTableSchema(SCHEMA))
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withFormatFunction(BigQueryUtils.toTableRow()));
pipeline.run();
这似乎应该是一个共同的目标,所以我想知道是否有更简单的方法,或者在 Java 中使用 Beam 实现相同目标的示例。
您可以尝试使用setCoder
设置代码如下。
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
.apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA)
.apply("Set first level key", WithKeys.of(new GroupKey(key1)))
.apply("Group by", GroupByKey.create())
.apply("to leaf nodes", ParDo.of(new ToLeafNode()))
.apply("Set 2nd level key", WithKeys.of(new GroupKey2()))
.apply("Group by 2nd level", GroupByKey.create())
.apply("To tree nodes", ParDo.of(new ToTreeNode())).setCoder(SerializableCoder.of(TreeNode.class))
.apply("Top N", Top.of(10, new CompareTreeNode<TreeNode>()))
.apply("Flatten", FlatMapElements.via(new FlattenNodes<TreeNode>()))
.apply("Expand", ParDo.of(new ExpandTreeNode()))
.apply("Top N of first key", Top.perKey(10, new CompareTreeNode<LeafNode>()))
.apply("Values", Values.create())
.apply("Flatten", FlatMapElements.via(new FlattenNodes<LeafNode>()))
.apply("Expand", ParDo.of(new ExpandLeafNode()))
.apply("Values", Values.create())
.apply("Write to bq",
BigQueryIO.<Row>write().to(outputTable).withSchema(BigQueryUtils.toTableSchema(SCHEMA))
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withFormatFunction(BigQueryUtils.toTableRow()));
pipeline.run();
但是,对于需要确定前 N 个国家/地区、前 N 个会话和前 N 个页面的用例,我建议将管道简化为仅分别对正确的字段进行 GroupBy,然后应用 Sum
和 Top
如下。
Pipeline pipeline = Pipeline.create(options);
rows = pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
.apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA);
sumByCountry =rows.apply("Set Country key", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via((Row row) -> KV.of(row.getCountry(), row.getCount()))))
.apply("Country Scores", Sum.<String>integersPerKey());
.apply("Top Countries", Top.of(N, new CompareValues()))
// Do the same for Session and page
sumBySession = rows....
sumByPage = rows....
我不确定你是否想要获取前 N 个国家/地区的所有行,但如果你想获取这些行,则可以在行 PCollection 上使用前 N 个国家/地区的 side input 并过滤结果出来。您可以对 Session 和 Page 执行相同的操作。
Dataflow 应根据此用例的需要进行扩展,因此您无需为此用例手动执行中间 groupby。
我正在使用 Apache Beam 2.14 和 Java。
给定一个如下所示的数据集:
| countryID | sessionID | pageID | count |
| --------- | --------- | --------- | -------- |
| a | a | a | 1 |
| a | b | c | 2 |
| b | c | a | 4 |
| c | d | a | 6 |
我想 return 一个数据集,其中只有计数总和在前 N 个国家/地区 ID 中的行,对于每个国家/地区 ID,前 N 个会话,对于每个会话 ID,前 N 个页面 ID。
数据集的大小有数十亿行 - 内存放不下。 顺便说一句 - 数据集驻留在 BigQuery 中,并尝试使用 DENSE_RANK() 或 ROW_NUMBER() 函数错误直接在 BigQuery 中执行此操作,由于此大小而导致 "memory limit exceeded" 错误,因此尝试改用 Dataflow。
我目前的策略是:
- 按countryID、sessionID、pageID的组合键分组,求各组之和。
- 按 countryID、sessionID 对结果进行分组,并求出每组的总和。
- 按 countryID 对结果进行分组并求出每组的总和。
- 使用
Top.of
获取排名靠前的国家/地区 ID - 将结果展平回到第 2 级分组并使用
Top.perKey
获得每个国家/地区的热门会话。 - 将结果展平到第 1 级分组并获取每个会话的顶部 pageID。
- 展平结果以发出行。
棘手的部分是需要在每个 "group by" 级别保留行,以便它们可以在最后发出。
我试图创建一个树结构,其中每个节点都包含 "group by" 步骤的结果 - 这样我就可以计算它的子节点的总和一次,以便在后续步骤中进行比较。
即在每个 "group by" 步骤,结果是 KV<String, Iterable<Node>>
,并且节点具有如下字段:
@DefaultCoder(SerializableCoder.class)
public static class TreeNode implements Node, Serializable {
private Long total = 0L;
private KV<String, Iterable<LeafNode>> kv;
...
虽然这似乎几乎适用于直接 运行ner 和一个小样本数据集,但当 运行 在数据流上时,我遇到与 Node
类 由于 Iterable
是输入 PCollection:
Caused by: java.io.NotSerializableException: org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable
考虑到我需要处理的数据集大小,将数据复制到内存中的不同集合中以进行序列化并不是一个可行的选择。
这是到目前为止的管道示例 - 仅使用 2 级分组作为示例:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
.apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA)
.apply("Set first level key", WithKeys.of(new GroupKey(key1)))
.apply("Group by", GroupByKey.create())
.apply("to leaf nodes", ParDo.of(new ToLeafNode()))
.apply("Set 2nd level key", WithKeys.of(new GroupKey2()))
.apply("Group by 2nd level", GroupByKey.create())
.apply("To tree nodes", ParDo.of(new ToTreeNode()))
.apply("Top N", Top.of(10, new CompareTreeNode<TreeNode>()))
.apply("Flatten", FlatMapElements.via(new FlattenNodes<TreeNode>()))
.apply("Expand", ParDo.of(new ExpandTreeNode()))
.apply("Top N of first key", Top.perKey(10, new CompareTreeNode<LeafNode>()))
.apply("Values", Values.create())
.apply("Flatten", FlatMapElements.via(new FlattenNodes<LeafNode>()))
.apply("Expand", ParDo.of(new ExpandLeafNode()))
.apply("Values", Values.create())
.apply("Write to bq",
BigQueryIO.<Row>write().to(outputTable).withSchema(BigQueryUtils.toTableSchema(SCHEMA))
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withFormatFunction(BigQueryUtils.toTableRow()));
pipeline.run();
这似乎应该是一个共同的目标,所以我想知道是否有更简单的方法,或者在 Java 中使用 Beam 实现相同目标的示例。
您可以尝试使用setCoder
设置代码如下。
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
.apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA)
.apply("Set first level key", WithKeys.of(new GroupKey(key1)))
.apply("Group by", GroupByKey.create())
.apply("to leaf nodes", ParDo.of(new ToLeafNode()))
.apply("Set 2nd level key", WithKeys.of(new GroupKey2()))
.apply("Group by 2nd level", GroupByKey.create())
.apply("To tree nodes", ParDo.of(new ToTreeNode())).setCoder(SerializableCoder.of(TreeNode.class))
.apply("Top N", Top.of(10, new CompareTreeNode<TreeNode>()))
.apply("Flatten", FlatMapElements.via(new FlattenNodes<TreeNode>()))
.apply("Expand", ParDo.of(new ExpandTreeNode()))
.apply("Top N of first key", Top.perKey(10, new CompareTreeNode<LeafNode>()))
.apply("Values", Values.create())
.apply("Flatten", FlatMapElements.via(new FlattenNodes<LeafNode>()))
.apply("Expand", ParDo.of(new ExpandLeafNode()))
.apply("Values", Values.create())
.apply("Write to bq",
BigQueryIO.<Row>write().to(outputTable).withSchema(BigQueryUtils.toTableSchema(SCHEMA))
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withFormatFunction(BigQueryUtils.toTableRow()));
pipeline.run();
但是,对于需要确定前 N 个国家/地区、前 N 个会话和前 N 个页面的用例,我建议将管道简化为仅分别对正确的字段进行 GroupBy,然后应用 Sum
和 Top
如下。
Pipeline pipeline = Pipeline.create(options);
rows = pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
.apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA);
sumByCountry =rows.apply("Set Country key", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via((Row row) -> KV.of(row.getCountry(), row.getCount()))))
.apply("Country Scores", Sum.<String>integersPerKey());
.apply("Top Countries", Top.of(N, new CompareValues()))
// Do the same for Session and page
sumBySession = rows....
sumByPage = rows....
我不确定你是否想要获取前 N 个国家/地区的所有行,但如果你想获取这些行,则可以在行 PCollection 上使用前 N 个国家/地区的 side input 并过滤结果出来。您可以对 Session 和 Page 执行相同的操作。
Dataflow 应根据此用例的需要进行扩展,因此您无需为此用例手动执行中间 groupby。