SpringKafka大批量加工

bxpogfeg  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(253)

使用SpringKafka1.0.5,我正在使用一个繁忙的主题,其中有10个分区,并发性为10。
我当前的代码基于分区id将消息添加到队列中,这两个分区id都保存在hashmap中。

@KafkaListener(topics = "${kafka.topic}")
public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) {
    //Pseudo code
    add to Hashmap<Integer, Queue<ConsumerRecord>> based on partition.
}

不幸的是,这种设计所需的处理时间是简单消耗所需时间的两倍。
我的要求是单独处理分区,但如何避免hashmap引用基于@kafkalistener的分区。
有没有更有效的方法?理想情况下,侦听器注解中的每个线程都将管理自己的列表。有没有一种方法可以做到这一点而不需要交叉引用,比如上面提到的基于分区id的hashmap?

mrfwxfqh

mrfwxfqh1#

考虑申报几个 @KafkaListener 方法来创建所需的每个分区。为此,您应该使用 topicPartitions 属性而不是 topics :

/**
 * Used to add topic/partition information to a {@code KafkaListener}.
 *
 */
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface TopicPartition {

相关问题