flink在时间窗口上应用函数

cl25kdpy  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(321)

我正在做一个flink项目。该项目的主要思想是读取json(网络日志)的数据流,关联它们,并生成一个新的json,它是不同json信息的组合。
此时,我能够读取json,生成keyedstream(基于生成日志的机器),然后生成5秒的窗口流。
我要执行的下一步是对窗口使用apply函数并组合每个json的信息。我有点搞不懂怎么做。
我目前拥有的代码如下:

DataStream<Tuple2<String,JSONObject>> MetaAlert = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new generateMetaAlert());

public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> {

        @Override
        public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2,
                Collector<Tuple2<String, JSONObject>> arg3) throws Exception {

        }

.apply(new generatemetaalert())部分正在报告下一个错误:
类型windowedstream,tuple,timewindow>中的方法apply(windowfunction,r,tuple,timewindow>)不适用于参数(metaalertgenerator.generatemetaalert)
有没有其他的代码结构方案不同于我编的那个?
事先谢谢你的帮助

taor4pac

taor4pac1#

当您应用 keyBy 函数(不使用匿名类)自定义 WindowFunction (第3个字段)应为 Tuple 因为编译器无法确定密钥的类型。此代码编译时没有错误(考虑到我试图用伪代码填充空白):

public class Test {

    public Test() {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<String> events = env.readTextFile("datastream.log");

        DataStream<Tuple2<String, JSONObject>> MetaAlert
                = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new GenerateMetaAlert());

    }

    public class JSONObject {
    }

    public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

}

但最简单的方法是使用匿名类,这样就可以保留 String 类型:

DataStream<Tuple2<String, JSONObject>> MetaAlert
        = events
        .flatMap(new JSONParser())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
            @Override
            public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
                // Your code here
            }
        });

最后,如果您希望保留类,但也希望保持键的类型不变,那么可以实现 KeySelector :

public class Test {

    public Test() {

        DataStream<Tuple2<String, JSONObject>> MetaAlert
                = events
                .flatMap(new JSONParser())
                .keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
                    @Override
                    public String getKey(Tuple2<String, JSONObject> json) throws Exception {
                        return json.f0;
                    }
                })
                .timeWindow(Time.seconds(5))
                .apply(new GenerateMetaAlert());
    }

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
        @Override
        public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

}

相关问题