结合 groupByKey 和未绑定的流
combine with groupByKey with unbound stream
管道是:
1. PCollections<String> readTopic = PubSubIO.readString() ...
2. PCollection<String> windowSession = readTopic.apply(Window.<String>into(Sessions
.withGapDuration(Duration.standardHours(1))));
3.PCollection<KV<post, user> KVparsedPosts= windowSession.apply(ParDo.of(new
ParseEventFn());
4. PCollection<KV<post, Iterable<user>> iterableKV=
KVparsedPosts.apply(GroupByKey.create())
5.PCollectionList<KV<post, Iterable<user>>listPosts =
iterableKV.apply(Combine.globally(new CombinePosts()).withoutDefaults())
6.listPosts.apply(ParDo.of(new writePosts())
输入是代表一个用户的消息,其中包含与该用户相关的 post 数组,
Post 不是唯一的(相同的 post 可能与许多用户相关)
我的问题是第 5 步合并,我不确定如何合并 KV<...> 以列出 writePost
将写入 D.B。一批 post 而不仅仅是一个 post,
示例:假设我们有用户:A、B、C 和 Posts:1、2、3、4
还有我们从 PubSub 读取的消息:
{user:a, posts:[1,2,4]}
{user:b, posts:[2,3]}
{user:c, posts:[2,4]}
我想这样组合:
{post:1, user:[a]}
{post:2, user:[a,b,c]}
{post:3, user:[b]}
{post:4, user:[a,c]}
如果我对你的问题的理解正确,那么看起来你已经想出了如何修改元素以获得示例中的预期结果,唯一剩下的问题是如何批量处理多个 post在一起,对吗?
我将针对您的示例(与您现有的代码匹配)以及多个 post 的批处理重述我建议的方法。
第 1 步: 从自定义 DoFn 创建 ParDo,将用户列表和 posts 转换为 的 KV .例如:
{user:a, posts:[1,2,4]} -> {post:1, user:a}, {post:2, user:a}, {post:4, user:a}
您似乎已经在这一行中这样做了:3. PCollection<KV<post, user> KVparsedPosts= windowSession.apply(ParDo.of(new ParseEventFn());
第 2 步: 执行 GroupByKey,使用 post 作为键。例如:
{post:2, user:a}, {post:2, user:b}, {post:2, user:c} -> {post:2, user:[a,b,c]}
您似乎在这条线上这样做:4. PCollection<KV<post, Iterable<user>> iterableKV= KVparsedPosts.apply(GroupByKey.create())
步骤 3(可选): 使用上面的代码,您可以一次向数据库写入一个 post。如果要对多个 post 进行批处理,则需要将多个 post 分组,这可以通过 GroupIntoBatches transform. In order to use this transform, you will need to assign all elements a placeholder key to batch them by, which can be done with WithKeys 来完成。例如:
PCollection<KV<post, Iterable<user>> iterableKV =
KVparsedPosts.apply(GroupByKey.create());
// Apply a key of 0 to every element.
PCollection<KV<Integer, KV<post, Iterable<user>>>> keyedPosts =
iterableKV.apply(WithKeys.of(0));
// Group all elements into batches of 5 posts.
PCollection<KV<Integer, Iterable<KV<post, Iterable<user>>>>> batchedPosts =
keyedPosts.apply(GroupIntoBatches.<Integer, KV<post, Iterable<user>>>ofSize(5));
批处理后,可以删除占位符键并可以完成任何后续工作。
管道是:
1. PCollections<String> readTopic = PubSubIO.readString() ...
2. PCollection<String> windowSession = readTopic.apply(Window.<String>into(Sessions
.withGapDuration(Duration.standardHours(1))));
3.PCollection<KV<post, user> KVparsedPosts= windowSession.apply(ParDo.of(new
ParseEventFn());
4. PCollection<KV<post, Iterable<user>> iterableKV=
KVparsedPosts.apply(GroupByKey.create())
5.PCollectionList<KV<post, Iterable<user>>listPosts =
iterableKV.apply(Combine.globally(new CombinePosts()).withoutDefaults())
6.listPosts.apply(ParDo.of(new writePosts())
输入是代表一个用户的消息,其中包含与该用户相关的 post 数组,
Post 不是唯一的(相同的 post 可能与许多用户相关)
我的问题是第 5 步合并,我不确定如何合并 KV<...> 以列出 writePost
将写入 D.B。一批 post 而不仅仅是一个 post,
示例:假设我们有用户:A、B、C 和 Posts:1、2、3、4 还有我们从 PubSub 读取的消息:
{user:a, posts:[1,2,4]}
{user:b, posts:[2,3]}
{user:c, posts:[2,4]}
我想这样组合:
{post:1, user:[a]}
{post:2, user:[a,b,c]}
{post:3, user:[b]}
{post:4, user:[a,c]}
如果我对你的问题的理解正确,那么看起来你已经想出了如何修改元素以获得示例中的预期结果,唯一剩下的问题是如何批量处理多个 post在一起,对吗?
我将针对您的示例(与您现有的代码匹配)以及多个 post 的批处理重述我建议的方法。
第 1 步: 从自定义 DoFn 创建 ParDo,将用户列表和 posts 转换为
{user:a, posts:[1,2,4]} -> {post:1, user:a}, {post:2, user:a}, {post:4, user:a}
您似乎已经在这一行中这样做了:3. PCollection<KV<post, user> KVparsedPosts= windowSession.apply(ParDo.of(new ParseEventFn());
第 2 步: 执行 GroupByKey,使用 post 作为键。例如:
{post:2, user:a}, {post:2, user:b}, {post:2, user:c} -> {post:2, user:[a,b,c]}
您似乎在这条线上这样做:4. PCollection<KV<post, Iterable<user>> iterableKV= KVparsedPosts.apply(GroupByKey.create())
步骤 3(可选): 使用上面的代码,您可以一次向数据库写入一个 post。如果要对多个 post 进行批处理,则需要将多个 post 分组,这可以通过 GroupIntoBatches transform. In order to use this transform, you will need to assign all elements a placeholder key to batch them by, which can be done with WithKeys 来完成。例如:
PCollection<KV<post, Iterable<user>> iterableKV =
KVparsedPosts.apply(GroupByKey.create());
// Apply a key of 0 to every element.
PCollection<KV<Integer, KV<post, Iterable<user>>>> keyedPosts =
iterableKV.apply(WithKeys.of(0));
// Group all elements into batches of 5 posts.
PCollection<KV<Integer, Iterable<KV<post, Iterable<user>>>>> batchedPosts =
keyedPosts.apply(GroupIntoBatches.<Integer, KV<post, Iterable<user>>>ofSize(5));
批处理后,可以删除占位符键并可以完成任何后续工作。