apache-flink:stream-join窗口未触发

b4lqfgs4  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(447)

我尝试在apache flink中加入两个流以获得一些结果。
我的项目的当前状态是,我获取twitter数据并将其Map到一个2元组中,在该元组中保存用户的语言和定义的时间窗口中tweet的总和。我做这些都是为了每种语言的推文数量和每种语言的转发量。tweet/retweet聚合在其他进程中工作正常。
我现在想得到在一个时间窗口中,转发次数占所有tweet数量的百分比。
因此,我使用以下代码:

Time windowSize = Time.seconds(15);

// Sum up tweets per language
DataStream<Tuple2<String, Integer>> tweetsLangSum = tweets
        .flatMap(new TweetLangFlatMap())
        .keyBy(0)
        .timeWindow(windowSize)
        .sum(1);

// ---

// Get retweets out of all tweets per language
DataStream<Tuple2<String, Integer>> retweetsLangMap = tweets
        .keyBy(new KeyByTweetPostId())
        .flatMap(new RetweetLangFlatMap());

// Sum up retweets per language
DataStream<Tuple2<String, Integer>> retweetsLangSum = retweetsLangMap
        .keyBy(0)
        .timeWindow(windowSize)
        .sum(1);

// ---

tweetsLangSum.join(retweetsLangSum)
            .where(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                    return tweet.f0;
                }
            })
            .equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                    return tweet.f0;
                }
            })
            .window(TumblingEventTimeWindows.of(windowSize))
            .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple4<String, Integer, Integer, Double>>() {
                @Override
                public Tuple4<String, Integer, Integer, Double> join(Tuple2<String, Integer> in1, Tuple2<String, Integer> in2) throws Exception {
                    String lang = in1.f0;
                    Double percentage = (double) in1.f1 / in2.f1;
                    return new Tuple4<>(in1.f0, in1.f1, in2.f1, percentage);
                }
            })
            .print();

当我打印时 tweetsLangSum 或者 retweetsLangSum 产量似乎不错。我的问题是,我从未从连接中获得输出。有人知道为什么吗?或者我在聚合的第一步中使用的window函数在连接时是错误的吗?

jfewjypa

jfewjypa1#

这可能是由不同时间语义的混合造成的。这个 KeyedStream.timeWindow() 方法是一种快捷方式,它基于配置的时间特性创建窗口操作符,即,如果启用了事件时间,则创建事件时间窗口,否则创建处理时间窗口。对于连接,可以显式定义事件时间窗口。
是否启用事件时间处理?

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

相关问题