我有惨叫代码:
KafkaListener(
topics = {
"#{@topic1}",
"#{@topics}"
},
containerFactory = "kafkaTopicListenerContainerFactory"
)
public void onNewConsumerRecords(List<ConsumerRecord<String, byte[]>> records, Acknowledgment acknowledgement) {
// Some logic
}
问题:
1.在轮询或执行onNewConsumerRecords.方法期间,变量records将包含来自两个主题还是仅一个主题的记录?
1.默认的主题分配策略是什么?如果它是range,并且我将并发设置为4,并且主题的分区数不同,是否可能有一些线程处于空闲或未充分利用状态?
1条答案
按热度按时间2admgd591#
它通常包含来自多个主题/分区的记录。
请参阅以下文档:
https://docs.spring.io/spring-kafka/docs/current/reference/html/#using-ConcurrentMessageListenerContainer
当监听多个主题时,预设的分割区分布可能不是您所预期的。例如,如果您有三个主题,每个主题有五个分割区,而您想要使用
concurrency=15
,则您只会看到五个作用中的用户,每个主题会指派给每个用户一个分割区。这是因为KafkaPartitionAssignor
默认为RangeAssignor
(请参阅其Javadoc)。对于此方案,您可能需要考虑改用RoundRobinAssignor
,它将分区分布在所有使用者之间。然后,为每个使用者分配一个主题或分区。要更改PartitionAssignor,您可以在提供给DefaultKafkaConsumerFactory
的属性中设置partition.assignment.strategy使用者属性(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)。当使用Sping Boot 时,您可以如下指派设定策略: