java 如何在我自定义的时间间隔后读取Kafka主题中的消息?

egdjgwm8  于 2023-06-04  发布在  Java
关注(0)|答案(1)|浏览(150)

我需要消费者阅读来自主题的消息不是立即,而是在一定时间间隔之后。在我的情况下,这个时间= 10秒。
我做了一个KafkaConfig,在其中我指定了属性并将它们添加到DefaultKafkaConsumerFactory中:
1.我试过了

  • factory.getContainerProperties().setIdleBetweenPolls(10000);*

1.我也试过通过

  • props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,10000);*

1.我也试过通过

  • props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,“99999999”); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,“10000”)*

但是上面的都不起作用:(
我很伤心。
我在这里发现了类似的案例,它们与我从互联网上找到的解决方案非常相似,但它们也不起作用。
下面是我的代码示例:

@Slf4j
@EnableKafka
@Configuration
@ConditionalOnProperty(
    value = "exclude.kafka",
    havingValue = "false",
    matchIfMissing = true
)
@RequiredArgsConstructor
public class MyKafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.getContainerProperties().setIdleBetweenPolls(10000);

        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batchId");
//        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
//        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "999999999");
//        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

有没有人知道如何让Kafka工作(10秒后阅读消息,而不是在他们击中主题时阅读)?

fumotvh3

fumotvh31#

没有内置的消息延迟-查看有关此here.的更多信息
但是,您可以利用KafkaConsumer API中的pauseresume方法
其中声明了以下内容
Kafka支持动态控制消耗流,在以后的轮询(long)调用中,分别使用pause(Collection)和resume(Collection)暂停指定分配分区上的消耗,恢复指定暂停分区上的消耗。
在我看来,这是阅读this后的解决方案
所以Java代码或多或少会像下面这样,这是一个草图:

Set<TopicPartition> partitions = consumer.assignment();
consumer.pause(partitions);
Thread.sleep(10000); // 10 Seconds in millis
...
consumer.resume(partitions)

免责声明:我还没试过

相关问题