我是一个Python开发者,但是我想用Go语言做一个数据流管道,我找不到像Python或Java那样多的例子。
我有下面的代码,其中有一个结构的用户名和年龄。任务是增加年龄,然后过滤年龄。我发现的方法来增加年龄,但停留在过滤的一部分。
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
beam.RegisterFunction(incrementAge)
}
type user struct {
Name string
Age int
}
func printRow(ctx context.Context, list user) {
fmt.Println(list)
}
func incrementAge(list user) user {
list.Age++
return list
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var userList = []user{
{"Bob", 40},
{"Adam", 50},
{"John", 35},
{"Ben", 8},
}
initial := beam.CreateList(s, userList)
pc := beam.ParDo(s, incrementAge, initial)
pc1 := beam.ParDo(s, func(row user, emit func(user)) {
emit(row)
}, pc)
beam.ParDo0(s, printRow, pc1)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
我试着创建了一个如下的函数,但是它返回了一个布尔值,而不是一个用户对象。我知道我错过了一些简单的东西,但无法弄清楚。
func filterAge(list user) user {
return list.Age > 40
}
在Python中,我可以编写如下函数。
beam.Filter(lambda line: line["Age"] >= 40))
1条答案
按热度按时间vojdkbi01#
您需要在函数中添加一个发射器来发射user:
正如在当前代码中所写的那样,
return list.Age > 40
list.Age > 40
首先计算为true(布尔值),然后返回此布尔值。