我正在使用Apache Beam Go SDK,很难获得正确格式的PCollection,以便按键进行分组/组合。
在字符串的PCollection中,每个键有多个记录,如下所示:
Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse
我想使用GroupByKey和CombinePerKey,这样我就可以像这样聚合每个人的宠物:
Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]
如何将PCollection转换为PCollection〈KV〈string,string〉〉?
他们提到了类似于here的东西,但是没有包括聚合字符串值的代码。
我可以使用ParDo来获取字符串键和字符串值,如下所示,但我不知道如何转换为GroupPerKey输入所需的KV〈string,string〉或CoGBK〈string,string〉格式。
pcolOut := beam.ParDo(s, func(line string) (string, string) {
cleanString := strings.TrimSpace(line)
openingChar := ","
iStart := strings.Index(cleanString, openingChar)
key := cleanString[0:iStart]
value := cleanString[iStart+1:]
// How to convert to PCollection<KV<string, string>> before returning?
return key, value
}, pcolIn)
groupedKV := beam.GroupByKey(s, pcolOut)
它失败并出现以下错误。有什么建议吗?
panic: inserting ParDo in scope root
creating new DoFn in scope root
binding fn main.main.func2
binding params [{Value string} {Value string}] to input CoGBK<string,string>
values of CoGBK<string,string> cannot bind to {Value string}
2条答案
按热度按时间jljoyd4f1#
要Map到KV,可以应用MapElements并使用into()设置KV类型,然后在via()逻辑中创建一个新的
KV.of(myKey, myValue)
。例如,要获得KV<String,String>
,可以使用如下代码:jq6vz3qz2#
也许你把下一个pardo iter类型
测试此代码