我正在尝试建立一个数据流处理系统,我想把我的传感器在最后一分钟发送的数据聚合起来。传感器将数据发送到Kafka服务器 sensor
它被Flink消耗掉了。
我使用带有kafkapython库的python生成器,并以json格式发送数据。在json中有一个字段 sent
包含时间戳。在python中,使用 int(datetime.now().timestamp())
我知道它以秒为单位返回一个unix的时间戳。
问题是系统什么也不打印!我做错什么了?
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// setting topic and processing the stream from Sensor
DataStream<Sensor> sensorStream = env.addSource(new FlinkKafkaConsumer010<>("sensor", new SimpleStringSchema(), properties))
.flatMap(new ParseSensor()) // parsing into a Sensor object
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Sensor>() {
@Override
public long extractAscendingTimestamp(Sensor element) {
return element.getSent()*1000;
}
});
sensorStream.keyBy(meanSelector).window(TumblingEventTimeWindows.of(Time.minutes(1))).apply(new WindowMean(dataAggregation)).print();
在我试图使这项工作,我找到了方法 .timeWindow()
而不是 .window()
真管用!更确切地说,我写道 .timeWindow(Time.minutes(1))
. n、 b:尽管Flink跑了5分钟,Windows还是只印了1次!
暂无答案!
目前还没有任何答案,快来回答吧!