如何在数据流程序中配置用户函数?

b1zrtrql  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(348)

我创建了一个具有配置的流式处理环境,并尝试在中访问此配置 open() a方法 RichMapFunction .
例子:

Configuration conf = new Configuration();
    conf.setBoolean("a", true);
    StreamExecutionEnvironment env = 
        StreamExecutionEnvironment.createLocalEnvironment(8, conf);

    DataStreamSource<Integer> source = env.fromElements(5,5,5,5,5);
    source.map(new RichMapFunction<Integer, Integer>() {

        @Override
        public void open(Configuration parameters) throws Exception {
            boolean a = parameters.getBoolean("a", false);
            super.open(parameters);
        }

        @Override
        public Integer map(Integer value) throws Exception {
            return value;
        }
    }).print();

    env.execute();

但是,在调试 open() 方法我发现配置是空的。
我做错什么了?如何将配置正确地传递给 RichFunction 在流媒体环境中?

lbsnaicq

lbsnaicq1#

flink的数据流和数据集api共享相同的用户函数接口,如 RichMapFunction 在你的例子中。
这个 Configuration 的参数 open Flink法 RichFunction 是dataset api第一个版本的遗留版本,未在datastream api中使用。flink序列化您在 map() 呼叫并将其发送给平行工作人员。因此,可以将对象中的参数直接设置为常规字段。

相关问题