Apache Beam Go SDK:如何将 PCollection<string> 转换为 PCollection<KV<string, string>>?

Apache Beam Go SDK: how to convert PCollection<string> to PCollection<KV<string, string>>?

我正在使用 Apache Beam Go SDK,但很难通过密钥获得 PCollection 的正确格式 grouping/combining。

我在 PCollection 字符串中的每个键都有多个记录,如下所示:

Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse

我想使用 GroupByKeyCombinePerKey 这样我就可以像这样汇总每个人的宠物:

Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]

如何将 PCollection 转换为 PCollection>?

他们提到了类似的东西 here,但不包括聚合字符串值的代码。

我可以使用 ParDo 获取字符串键和字符串值,如下所示,但我不知道如何转换为所需的 KV 或 CoGBK 格式输入到 GroupPerKey。

pcolOut := beam.ParDo(s, func(line string) (string, string) {
  cleanString := strings.TrimSpace(line)
  openingChar := ","
  iStart := strings.Index(cleanString, openingChar)
  key := cleanString[0:iStart]
  value := cleanString[iStart+1:]
        
// How to convert to PCollection<KV<string, string>> before returning?
  return key, value
}, pcolIn)

groupedKV := beam.GroupByKey(s, pcolOut) 

失败并出现以下错误。有什么建议吗?

panic:  inserting ParDo in scope root
        creating new DoFn in scope root
        binding fn main.main.func2
        binding params [{Value string} {Value string}] to input CoGBK<string,string>
values of CoGBK<string,string> cannot bind to {Value string}

要映射到KV,可以应用MapElements,使用into()设置KV类型,在via()逻辑中,新建一个KV.of(myKey, myValue),比如得到一个KV<String,String>, 使用这样的东西:

    PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
        TypeDescriptors.kvs(
            TypeDescriptors.strings(),
            TypeDescriptors.strings()))
        .via(
            linkpage -> KV.of(dataFile, linkpage)));