apache-kafka KafkaListener侦听多个主题的行为

e5nqia27  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(348)

我有惨叫代码:

KafkaListener(
        topics = {
                "#{@topic1}",
                "#{@topics}"
        },
        containerFactory = "kafkaTopicListenerContainerFactory"
)
public void onNewConsumerRecords(List<ConsumerRecord<String, byte[]>> records, Acknowledgment acknowledgement) {
    // Some logic
}

问题:
1.在轮询或执行onNewConsumerRecords.方法期间,变量records将包含来自两个主题还是仅一个主题的记录?
1.默认的主题分配策略是什么?如果它是range,并且我将并发设置为4,并且主题的分区数不同,是否可能有一些线程处于空闲或未充分利用状态?

2admgd59

2admgd591#

它通常包含来自多个主题/分区的记录。
请参阅以下文档:
https://docs.spring.io/spring-kafka/docs/current/reference/html/#using-ConcurrentMessageListenerContainer
当监听多个主题时,预设的分割区分布可能不是您所预期的。例如,如果您有三个主题,每个主题有五个分割区,而您想要使用concurrency=15,则您只会看到五个作用中的用户,每个主题会指派给每个用户一个分割区。这是因为Kafka PartitionAssignor默认为RangeAssignor(请参阅其Javadoc)。对于此方案,您可能需要考虑改用RoundRobinAssignor,它将分区分布在所有使用者之间。然后,为每个使用者分配一个主题或分区。要更改PartitionAssignor,您可以在提供给DefaultKafkaConsumerFactory的属性中设置partition.assignment.strategy使用者属性(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。
当使用Sping Boot 时,您可以如下指派设定策略:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

相关问题