我创建了一个具有配置的流式处理环境,并尝试在中访问此配置 open()
a方法 RichMapFunction
.
例子:
Configuration conf = new Configuration();
conf.setBoolean("a", true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(8, conf);
DataStreamSource<Integer> source = env.fromElements(5,5,5,5,5);
source.map(new RichMapFunction<Integer, Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
boolean a = parameters.getBoolean("a", false);
super.open(parameters);
}
@Override
public Integer map(Integer value) throws Exception {
return value;
}
}).print();
env.execute();
但是,在调试 open()
方法我发现配置是空的。
我做错什么了?如何将配置正确地传递给 RichFunction
在流媒体环境中?
1条答案
按热度按时间lbsnaicq1#
flink的数据流和数据集api共享相同的用户函数接口,如
RichMapFunction
在你的例子中。这个
Configuration
的参数open
Flink法RichFunction
是dataset api第一个版本的遗留版本,未在datastream api中使用。flink序列化您在map()
呼叫并将其发送给平行工作人员。因此,可以将对象中的参数直接设置为常规字段。