两个流的连接不起作用

5n0oy7gb  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(377)

我试图加入两个流使用ApacheFlink流api,但什么都没有加入,我不知道在阅读文档后,我做错了什么

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStream<MyPojo2> source = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ola"), new MyPojo2(2, "Ola")))
            .assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>());
    DataStream<MyPojo2> source2 = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ela"), new MyPojo2(2, "Ela")))
            .assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>());
    DataStream<Tuple2<String, String>> joined = source.join(source2).where(keySelector).equalTo(keySelector).
            window(GlobalWindows.create()).apply(joinFunction);
    joined.print();
    env.execute("Window");

关键功能是 myPojo.getFirst()

e4eetjau

e4eetjau1#

这个 GlobalWindows 除非您指定自定义窗口,否则窗口永远不会激发 Trigger . 在你的例子中,如果你使用 TumblingEventTimeWindows.of(Time.seconds(5)) 你应该看到结果。

相关问题