假设我有一个Kafka消费者注册了 Group Id 'G'
倾听 topic stream 'T'
. java配置如下:
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "60000");
props.put("zookeeper.sync.time.ms", "2000");
props.put("auto.commit.enable", "true");
props.put("auto.offset.reset", "largest");
return new ConsumerConfig(props);
现在,我需要配置我的使用者组,如果偏移提交太旧(比如超过x秒),则丢弃它,并使用最大的偏移量(最新)。有什么办法吗?
编辑:后续问题。是否有任何方法可以通过编程方式从使用者中删除按需提交偏移量?
暂无答案!
目前还没有任何答案,快来回答吧!