apache flink 1.2-窗口聚合的时间戳和水印分配

4sup72z8  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(212)

我正在尝试建立一个数据流处理系统,我想把我的传感器在最后一分钟发送的数据聚合起来。传感器将数据发送到Kafka服务器 sensor 它被Flink消耗掉了。
我使用带有kafkapython库的python生成器,并以json格式发送数据。在json中有一个字段 sent 包含时间戳。在python中,使用 int(datetime.now().timestamp()) 我知道它以秒为单位返回一个unix的时间戳。
问题是系统什么也不打印!我做错什么了?

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// setting topic and processing the stream from Sensor
DataStream<Sensor> sensorStream = env.addSource(new FlinkKafkaConsumer010<>("sensor", new SimpleStringSchema(), properties))
                                     .flatMap(new ParseSensor()) // parsing into a Sensor object
                                     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Sensor>() {
                                         @Override
                                         public long extractAscendingTimestamp(Sensor element) {
                                             return element.getSent()*1000;
                                         }
                                     });
sensorStream.keyBy(meanSelector).window(TumblingEventTimeWindows.of(Time.minutes(1))).apply(new WindowMean(dataAggregation)).print();

在我试图使这项工作,我找到了方法 .timeWindow() 而不是 .window() 真管用!更确切地说,我写道 .timeWindow(Time.minutes(1)) . n、 b:尽管Flink跑了5分钟,Windows还是只印了1次!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题