结合 groupByKey 和未绑定的流

combine with groupByKey with unbound stream

管道是:

  1. PCollections<String> readTopic = PubSubIO.readString() ...
  2. PCollection<String> windowSession = readTopic.apply(Window.<String>into(Sessions
                                .withGapDuration(Duration.standardHours(1))));
  3.PCollection<KV<post, user> KVparsedPosts= windowSession.apply(ParDo.of(new 
     ParseEventFn());
  4. PCollection<KV<post, Iterable<user>> iterableKV= 
                        KVparsedPosts.apply(GroupByKey.create())
  5.PCollectionList<KV<post, Iterable<user>>listPosts = 
        iterableKV.apply(Combine.globally(new CombinePosts()).withoutDefaults())
  6.listPosts.apply(ParDo.of(new writePosts())

输入是代表一个用户的消息,其中包含与该用户相关的 post 数组, Post 不是唯一的(相同的 post 可能与许多用户相关) 我的问题是第 5 步合并,我不确定如何合并 KV<...> 以列出 writePost 将写入 D.B。一批 post 而不仅仅是一个 post,

示例:假设我们有用户:A、B、C 和 Posts:1、2、3、4 还有我们从 PubSub 读取的消息:

{user:a, posts:[1,2,4]}
{user:b, posts:[2,3]}
{user:c, posts:[2,4]}

我想这样组合:

{post:1, user:[a]}
{post:2, user:[a,b,c]}
{post:3, user:[b]}
{post:4, user:[a,c]}

如果我对你的问题的理解正确,那么看起来你已经想出了如何修改元素以获得示例中的预期结果,唯一剩下的问题是如何批量处理多个 post在一起,对吗?

我将针对您的示例(与您现有的代码匹配)以及多个 post 的批处理重述我建议的方法。

第 1 步: 从自定义 DoFn 创建 ParDo,将用户列表和 posts 转换为 的 KV .例如:

{user:a, posts:[1,2,4]} -> {post:1, user:a}, {post:2, user:a}, {post:4, user:a}

您似乎已经在这一行中这样做了:3. PCollection<KV<post, user> KVparsedPosts= windowSession.apply(ParDo.of(new ParseEventFn());

第 2 步: 执行 GroupByKey,使用 post 作为键。例如:

{post:2, user:a}, {post:2, user:b}, {post:2, user:c} -> {post:2, user:[a,b,c]}

您似乎在这条线上这样做:4. PCollection<KV<post, Iterable<user>> iterableKV= KVparsedPosts.apply(GroupByKey.create())

步骤 3(可选): 使用上面的代码,您可以一次向数据库写入一个 post。如果要对多个 post 进行批处理,则需要将多个 post 分组,这可以通过 GroupIntoBatches transform. In order to use this transform, you will need to assign all elements a placeholder key to batch them by, which can be done with WithKeys 来完成。例如:

PCollection<KV<post, Iterable<user>> iterableKV =
        KVparsedPosts.apply(GroupByKey.create());

// Apply a key of 0 to every element.
PCollection<KV<Integer, KV<post, Iterable<user>>>> keyedPosts =
        iterableKV.apply(WithKeys.of(0));

// Group all elements into batches of 5 posts.
PCollection<KV<Integer, Iterable<KV<post, Iterable<user>>>>> batchedPosts =
        keyedPosts.apply(GroupIntoBatches.<Integer, KV<post, Iterable<user>>>ofSize(5));

批处理后,可以删除占位符键并可以完成任何后续工作。