将全局数据或运行特定数据传输到Go中的Beam DoFn

vsmadaxz  于 2022-12-07  发布在  Go
关注(0)|答案(2)|浏览(117)

我正在Go语言中为数据流流水线编写Beam ParDo转换,作为一个DoFn。我试图找到一种方法,将在运行时计算的Map添加到每个DoFn中,但在流水线之前。使用状态API将其放入似乎不太正确,因为它是流水线持续时间内的常量数据。但我似乎无法传入一个预初始化的DoFn来完成此操作。我尝试过

type EngineMap struct {
    Map map[string]string 
}

type ResultProcessor struct {
    engineMap EngineMap
}

... (ProcessElement defined, initialization)

processor := ResultProcessor{}
processor.engineMap.Map = make(map[string]string)
for k, v := range engines.Map {
    processor.engineMap.Map[k] = v
}
register.DoFn2x1[context.Context, []byte, []string](&processor)

... (pipeline initialized, input "lines" defined)

result := beam.ParDo(s, &processor, lines)

但是当我运行这个函数时,engineMap中的map在ProcessElement()方法运行时仍然是empty,即使它不是在for循环之后。我可以将这个数据作为一个辅助输入传递,但是对于一个在管道运行时保持恒定的相当小的map来说,这似乎是不必要的复杂,尤其是对于一个流管道。
有没有其他方法来传递数据?

rt4zxlrg

rt4zxlrg1#

根本原因是engineMap字段未导出,因此其数据无法序列化。只有导出字段(例如EngineMap)可以序列化。这是依赖反射的“通用”编码器的属性,如JSON或Beam Schema行编码。
https://beam.apache.org/documentation/programming-guide/#user-code-serializability
没有必要也不建议注册与管道中使用的DoFn相同的示例。(如果这样做可行,我们根本不需要注册)。DoFn注册应该在init块中发生,或者至少在main中调用beam.Init()之前发生。

xkftehaa

xkftehaa2#

如果我没理解错的话,您使用的Map只是一个DoFn成员变量,它在DoFn初始化后保持不变?在这种情况下,我建议将成员变量engineMap设置为公共变量,以便在为bundle创建的DoFn示例中对其进行序列化和反序列化。
StartBundle的元素与ProcessElement方法的元素相同。https://github.com/apache/beam/blob/b68d38e32c2aac51170da16c4d9c479420754009/sdks/go/pkg/beam/pardo.go#L240
StartBundle示例(这是一个相当大的示例,因此我建议关注单个DoFn):https://github.com/apache/beam/blob/67e6726ffeb47d2ada0122369fa230833ce0f026/sdks/go/examples/large_wordcount/large_wordcount.go#L207

相关问题