我的flink应用程序执行以下操作:
以Kafka的记录形式读取数据
8种按键组合和60秒翻滚时间窗
sink:elasticsearch
代码如下:
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties));
DataStream<Bean> input = stream.filter(...).flatMap(...)
DataStream<Bean1> s1 = input.assignTimestampsAndWatermarks(...).keyBy(key1).window(TumblingEventTimeWindows.of(Time.seconds(60)).process()
DataStream<Bean1> s2 = input.assignTimestampsAndWatermarks(...).keyBy(key2).window(TumblingEventTimeWindows.of(Time.seconds(60)).process()
DataStream<Bean1> s3 = input.assignTimestampsAndWatermarks(...).keyBy(key3).window(TumblingEventTimeWindows.of(Time.seconds(60)).process()
Then add ElasticsearchSink of each stream
但效果不好。运行一段时间会引发异常
org.apache.kafka.common.errors.DisconnectException
Kafka很好用,那有什么问题
暂无答案!
目前还没有任何答案,快来回答吧!