Apache Beam Go软件开发工具包:如何将PCollection转换< string>为〈KV〈字符串,字符串>>?

r3i60tvu  于 2023-02-06  发布在  Go
关注(0)|答案(2)|浏览(106)

我正在使用Apache Beam Go SDK,很难获得正确格式的PCollection,以便按键进行分组/组合。
在字符串的PCollection中,每个键有多个记录,如下所示:

Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse

我想使用GroupByKeyCombinePerKey,这样我就可以像这样聚合每个人的宠物:

Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]

如何将PCollection转换为PCollection〈KV〈string,string〉〉?

他们提到了类似于here的东西,但是没有包括聚合字符串值的代码。
我可以使用ParDo来获取字符串键和字符串值,如下所示,但我不知道如何转换为GroupPerKey输入所需的KV〈string,string〉或CoGBK〈string,string〉格式。

pcolOut := beam.ParDo(s, func(line string) (string, string) {
  cleanString := strings.TrimSpace(line)
  openingChar := ","
  iStart := strings.Index(cleanString, openingChar)
  key := cleanString[0:iStart]
  value := cleanString[iStart+1:]
        
// How to convert to PCollection<KV<string, string>> before returning?
  return key, value
}, pcolIn)

groupedKV := beam.GroupByKey(s, pcolOut)

它失败并出现以下错误。有什么建议吗?

panic:  inserting ParDo in scope root
        creating new DoFn in scope root
        binding fn main.main.func2
        binding params [{Value string} {Value string}] to input CoGBK<string,string>
values of CoGBK<string,string> cannot bind to {Value string}
jljoyd4f

jljoyd4f1#

要Map到KV,可以应用MapElements并使用into()设置KV类型,然后在via()逻辑中创建一个新的KV.of(myKey, myValue)。例如,要获得KV<String,String>,可以使用如下代码:

PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
        TypeDescriptors.kvs(
            TypeDescriptors.strings(),
            TypeDescriptors.strings()))
        .via(
            linkpage -> KV.of(dataFile, linkpage)));
jq6vz3qz

jq6vz3qz2#

也许你把下一个pardo iter类型
测试此代码

pcolIn := beam.CreateList(s, []string{"Bob, cat",
    "Bob, dog",
    "Carla, cat",
    "Carla, bunny",
    "Doug, horse",
})

pcolOut := beam.ParDo(s, func(line string) (string, string) {
    cleanString := strings.TrimSpace(line)
    openingChar := ","
    iStart := strings.Index(cleanString, openingChar)
    key := cleanString[0:iStart]
    value := cleanString[iStart+1:]

    // How to convert to PCollection<KV<string, string>> before returning?
    return key, value
}, pcolIn)

groupedKV := beam.GroupByKey(s, pcolOut)

beam.ParDo0(s, func(key string, iter func(*string) bool) {
    vals := []string{}
    val := ""
    for iter(&val) {
        vals = append(vals, strings.TrimSpace(val))
    }
    fmt.Println(key, vals)
}, groupedKV)

相关问题