我需要消费者阅读来自主题的消息不是立即,而是在一定时间间隔之后。在我的情况下,这个时间= 10秒。
我做了一个KafkaConfig,在其中我指定了属性并将它们添加到DefaultKafkaConsumerFactory中:
1.我试过了
- factory.getContainerProperties().setIdleBetweenPolls(10000);*
1.我也试过通过
- props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,10000);*
1.我也试过通过
- props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,“99999999”); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,“10000”)*
但是上面的都不起作用:(
我很伤心。
我在这里发现了类似的案例,它们与我从互联网上找到的解决方案非常相似,但它们也不起作用。
下面是我的代码示例:
@Slf4j
@EnableKafka
@Configuration
@ConditionalOnProperty(
value = "exclude.kafka",
havingValue = "false",
matchIfMissing = true
)
@RequiredArgsConstructor
public class MyKafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setIdleBetweenPolls(10000);
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batchId");
// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
// props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "999999999");
// props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
有没有人知道如何让Kafka工作(10秒后阅读消息,而不是在他们击中主题时阅读)?
1条答案
按热度按时间fumotvh31#
没有内置的消息延迟-查看有关此here.的更多信息
但是,您可以利用KafkaConsumer API中的
pause
和resume
方法其中声明了以下内容
Kafka支持动态控制消耗流,在以后的轮询(long)调用中,分别使用pause(Collection)和resume(Collection)暂停指定分配分区上的消耗,恢复指定暂停分区上的消耗。
在我看来,这是阅读this后的解决方案
所以Java代码或多或少会像下面这样,这是一个草图:
免责声明:我还没试过