我的要求是从kafka流中获得聚合数据,时间间隔为1分钟,但由于提交间隔与窗口大小不同步,因此在正确聚合之前就已经提交了。
Kafka流属性:
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,//server);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ID");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 60000L);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Kafka河:
KTable<Windowed<String>, String> kTableMetric = inputStream.flatMap((key, value) -> {
//aggregation logic
}).groupByKey(Serialized.with(Serdes.String().getClass(), Serdes.String().getClass()))
.windowedBy(TimeWindows.of(60000L)).reduce(Count::merge);
有人能帮我吗?
暂无答案!
目前还没有任何答案,快来回答吧!