是否可以从 PubSub 读取消息并将其数据分离到 PCollection<String> 的不同元素中?如果是这样,如何?
Is it possible to read a message from a PubSub and separate its data in different elements of a PCollection<String>? If so, how?
现在,我有以下代码:
PCollection<String> input_data =
pipeline
.apply(PubsubIO
.Read
.withCoder(StringUtf8Coder.of())
.named("ReadFromPubSub")
.subscription("/subscriptions/project_name/subscription_name"));
我认为你的意思是你想要的数据存在于 PCollection 的不同元素中,并且想以某种方式提取和分组它。
一种可能的方法是编写一个 DoFn 函数来处理 PCollection 中的每个字符串。您为要分组的每条数据输出一个键值对。然后,您可以使用 GroupByKey 转换将所有相关数据组合在一起。
例如,您的 PCollection 中有来自 pubsub 的以下消息:
- 用户1234购买了商品A
- 用户1234购买了商品B
DoFn函数会输出一个键值对,用户id为键,购买的商品为值。 ( <1234,A> , <1234, B> )。
使用 GroupByKey 转换,您可以将两个值组合在一个元素中。然后您可以对该元素执行进一步处理。
这是大数据中一种非常常见的模式,称为 mapreduce。
您似乎想从 pubsub 读取一些消息,并通过在 space 个字符上拆分一条消息将它们中的每一个转换为多个部分,然后将这些部分提供给管道的其余部分。不需要 PubsubIO 的特殊配置,因为它不是 "reading data" 问题 - 这是 "transforming data you have already read" 问题 - 你只需要插入一个 ParDo 来获取你的 "composite" 记录并将其分解在您想要的方式,例如:
PCollection<String> input_data =
pipeline
.apply(PubsubIO
.Read
.withCoder(StringUtf8Coder.of())
.named("ReadFromPubSub")
.subscription("/subscriptions/project_name/subscription_name"))
.apply(ParDo.of(new DoFn<String, String>() {
public void processElement(ProcessContext c) {
String composite = c.element();
for (String part : composite.split(" ")) {
c.output(part);
}
}}));
}));
您可以输出一个 Iterable<A>
然后使用 Flatten
压缩它。毫不奇怪,这在许多下一代数据处理平台 c.f 中被称为 flatMap
。火花/闪烁。
现在,我有以下代码:
PCollection<String> input_data =
pipeline
.apply(PubsubIO
.Read
.withCoder(StringUtf8Coder.of())
.named("ReadFromPubSub")
.subscription("/subscriptions/project_name/subscription_name"));
我认为你的意思是你想要的数据存在于 PCollection 的不同元素中,并且想以某种方式提取和分组它。
一种可能的方法是编写一个 DoFn 函数来处理 PCollection 中的每个字符串。您为要分组的每条数据输出一个键值对。然后,您可以使用 GroupByKey 转换将所有相关数据组合在一起。
例如,您的 PCollection 中有来自 pubsub 的以下消息:
- 用户1234购买了商品A
- 用户1234购买了商品B
DoFn函数会输出一个键值对,用户id为键,购买的商品为值。 ( <1234,A> , <1234, B> )。 使用 GroupByKey 转换,您可以将两个值组合在一个元素中。然后您可以对该元素执行进一步处理。
这是大数据中一种非常常见的模式,称为 mapreduce。
您似乎想从 pubsub 读取一些消息,并通过在 space 个字符上拆分一条消息将它们中的每一个转换为多个部分,然后将这些部分提供给管道的其余部分。不需要 PubsubIO 的特殊配置,因为它不是 "reading data" 问题 - 这是 "transforming data you have already read" 问题 - 你只需要插入一个 ParDo 来获取你的 "composite" 记录并将其分解在您想要的方式,例如:
PCollection<String> input_data =
pipeline
.apply(PubsubIO
.Read
.withCoder(StringUtf8Coder.of())
.named("ReadFromPubSub")
.subscription("/subscriptions/project_name/subscription_name"))
.apply(ParDo.of(new DoFn<String, String>() {
public void processElement(ProcessContext c) {
String composite = c.element();
for (String part : composite.split(" ")) {
c.output(part);
}
}}));
}));
您可以输出一个 Iterable<A>
然后使用 Flatten
压缩它。毫不奇怪,这在许多下一代数据处理平台 c.f 中被称为 flatMap
。火花/闪烁。