在同一个源上flink多个窗口

igetnqfo  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(266)

我的flink应用程序执行以下操作:
以Kafka的记录形式读取数据
8种按键组合和60秒翻滚时间窗
sink:elasticsearch
代码如下:

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties));

    DataStream<Bean> input = stream.filter(...).flatMap(...)
    DataStream<Bean1> s1 = input.assignTimestampsAndWatermarks(...).keyBy(key1).window(TumblingEventTimeWindows.of(Time.seconds(60)).process()

    DataStream<Bean1> s2 = input.assignTimestampsAndWatermarks(...).keyBy(key2).window(TumblingEventTimeWindows.of(Time.seconds(60)).process()

    DataStream<Bean1> s3 = input.assignTimestampsAndWatermarks(...).keyBy(key3).window(TumblingEventTimeWindows.of(Time.seconds(60)).process()

    Then add ElasticsearchSink of each stream

但效果不好。运行一段时间会引发异常

org.apache.kafka.common.errors.DisconnectException

Kafka很好用,那有什么问题

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题