Apache Beam Go SDK:如何将 PCollection<string> 转换为 PCollection<KV<string, string>>?
Apache Beam Go SDK: how to convert PCollection<string> to PCollection<KV<string, string>>?
我正在使用 Apache Beam Go SDK,但很难通过密钥获得 PCollection 的正确格式 grouping/combining。
我在 PCollection 字符串中的每个键都有多个记录,如下所示:
Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse
我想使用 GroupByKey 和 CombinePerKey 这样我就可以像这样汇总每个人的宠物:
Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]
如何将 PCollection 转换为 PCollection>?
他们提到了类似的东西 here,但不包括聚合字符串值的代码。
我可以使用 ParDo 获取字符串键和字符串值,如下所示,但我不知道如何转换为所需的 KV 或 CoGBK 格式输入到 GroupPerKey。
pcolOut := beam.ParDo(s, func(line string) (string, string) {
cleanString := strings.TrimSpace(line)
openingChar := ","
iStart := strings.Index(cleanString, openingChar)
key := cleanString[0:iStart]
value := cleanString[iStart+1:]
// How to convert to PCollection<KV<string, string>> before returning?
return key, value
}, pcolIn)
groupedKV := beam.GroupByKey(s, pcolOut)
失败并出现以下错误。有什么建议吗?
panic: inserting ParDo in scope root
creating new DoFn in scope root
binding fn main.main.func2
binding params [{Value string} {Value string}] to input CoGBK<string,string>
values of CoGBK<string,string> cannot bind to {Value string}
要映射到KV,可以应用MapElements,使用into()设置KV类型,在via()逻辑中,新建一个KV.of(myKey, myValue)
,比如得到一个KV<String,String>
, 使用这样的东西:
PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.strings()))
.via(
linkpage -> KV.of(dataFile, linkpage)));
我正在使用 Apache Beam Go SDK,但很难通过密钥获得 PCollection 的正确格式 grouping/combining。
我在 PCollection 字符串中的每个键都有多个记录,如下所示:
Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse
我想使用 GroupByKey 和 CombinePerKey 这样我就可以像这样汇总每个人的宠物:
Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]
如何将 PCollection
他们提到了类似的东西 here,但不包括聚合字符串值的代码。
我可以使用 ParDo 获取字符串键和字符串值,如下所示,但我不知道如何转换为所需的 KV
pcolOut := beam.ParDo(s, func(line string) (string, string) {
cleanString := strings.TrimSpace(line)
openingChar := ","
iStart := strings.Index(cleanString, openingChar)
key := cleanString[0:iStart]
value := cleanString[iStart+1:]
// How to convert to PCollection<KV<string, string>> before returning?
return key, value
}, pcolIn)
groupedKV := beam.GroupByKey(s, pcolOut)
失败并出现以下错误。有什么建议吗?
panic: inserting ParDo in scope root
creating new DoFn in scope root
binding fn main.main.func2
binding params [{Value string} {Value string}] to input CoGBK<string,string>
values of CoGBK<string,string> cannot bind to {Value string}
要映射到KV,可以应用MapElements,使用into()设置KV类型,在via()逻辑中,新建一个KV.of(myKey, myValue)
,比如得到一个KV<String,String>
, 使用这样的东西:
PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.strings()))
.via(
linkpage -> KV.of(dataFile, linkpage)));