如何将kafka流提交间隔与kafka流中的滚动窗口同步?

qaxu7uf2  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(238)

我的要求是从kafka流中获得聚合数据,时间间隔为1分钟,但由于提交间隔与窗口大小不同步,因此在正确聚合之前就已经提交了。
Kafka流属性:

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,//server);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ID");
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 60000L);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

Kafka河:

KTable<Windowed<String>, String> kTableMetric = inputStream.flatMap((key, value) -> {
            //aggregation logic
        }).groupByKey(Serialized.with(Serdes.String().getClass(), Serdes.String().getClass()))
                .windowedBy(TimeWindows.of(60000L)).reduce(Count::merge);

有人能帮我吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题