Go语言中的Apache Beam ParDo过滤器

6ie5vjzr  于 2023-02-01  发布在  Go
关注(0)|答案(1)|浏览(144)

我是一个Python开发者,但是我想用Go语言做一个数据流管道,我找不到像Python或Java那样多的例子。
我有下面的代码,其中有一个结构的用户名和年龄。任务是增加年龄,然后过滤年龄。我发现的方法来增加年龄,但停留在过滤的一部分。

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(incrementAge)
}

type user struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list user) {
    fmt.Println(list)
}

func incrementAge(list user) user {
    list.Age++
    return list
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []user{
        {"Bob", 40},
        {"Adam", 50},
        {"John", 35},
        {"Ben", 8},
    }
    initial := beam.CreateList(s, userList)

    pc := beam.ParDo(s, incrementAge, initial)

    pc1 := beam.ParDo(s, func(row user, emit func(user)) {
        emit(row)
    }, pc)

    beam.ParDo0(s, printRow, pc1)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

我试着创建了一个如下的函数,但是它返回了一个布尔值,而不是一个用户对象。我知道我错过了一些简单的东西,但无法弄清楚。

func filterAge(list user) user {
    return list.Age > 40    
}

在Python中,我可以编写如下函数。

beam.Filter(lambda line: line["Age"] >= 40))
vojdkbi0

vojdkbi01#

您需要在函数中添加一个发射器来发射user:

func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}

正如在当前代码中所写的那样,return list.Age > 40list.Age > 40首先计算为true(布尔值),然后返回此布尔值。

相关问题