我正在Go语言中为数据流流水线编写Beam ParDo转换,作为一个DoFn。我试图找到一种方法,将在运行时计算的Map添加到每个DoFn中,但在流水线之前。使用状态API将其放入似乎不太正确,因为它是流水线持续时间内的常量数据。但我似乎无法传入一个预初始化的DoFn来完成此操作。我尝试过
type EngineMap struct {
Map map[string]string
}
type ResultProcessor struct {
engineMap EngineMap
}
... (ProcessElement defined, initialization)
processor := ResultProcessor{}
processor.engineMap.Map = make(map[string]string)
for k, v := range engines.Map {
processor.engineMap.Map[k] = v
}
register.DoFn2x1[context.Context, []byte, []string](&processor)
... (pipeline initialized, input "lines" defined)
result := beam.ParDo(s, &processor, lines)
但是当我运行这个函数时,engineMap中的map在ProcessElement()方法运行时仍然是empty,即使它不是在for
循环之后。我可以将这个数据作为一个辅助输入传递,但是对于一个在管道运行时保持恒定的相当小的map来说,这似乎是不必要的复杂,尤其是对于一个流管道。
有没有其他方法来传递数据?
2条答案
按热度按时间rt4zxlrg1#
根本原因是
engineMap
字段未导出,因此其数据无法序列化。只有导出字段(例如EngineMap
)可以序列化。这是依赖反射的“通用”编码器的属性,如JSON或Beam Schema行编码。https://beam.apache.org/documentation/programming-guide/#user-code-serializability
没有必要也不建议注册与管道中使用的DoFn相同的示例。(如果这样做可行,我们根本不需要注册)。DoFn注册应该在init块中发生,或者至少在main中调用beam.Init()之前发生。
xkftehaa2#
如果我没理解错的话,您使用的Map只是一个DoFn成员变量,它在DoFn初始化后保持不变?在这种情况下,我建议将成员变量
engineMap
设置为公共变量,以便在为bundle创建的DoFn示例中对其进行序列化和反序列化。StartBundle的元素与ProcessElement方法的元素相同。https://github.com/apache/beam/blob/b68d38e32c2aac51170da16c4d9c479420754009/sdks/go/pkg/beam/pardo.go#L240
StartBundle示例(这是一个相当大的示例,因此我建议关注单个DoFn):https://github.com/apache/beam/blob/67e6726ffeb47d2ada0122369fa230833ce0f026/sdks/go/examples/large_wordcount/large_wordcount.go#L207