我目前分析了我的kafka producer spring boot应用程序,发现许多“kafka producer network thread”正在运行(总共47个)。它永远不会停止运行,即使没有数据发送。我的应用程序看起来有点像这样:
var kafkaSender = KafkaSender(kafkaTemplate, applicationProperties)
kafkaSender.sendToKafka(json, rs.getString("KEY"))
与Kafka森德:
@Service
class KafkaSender(val kafkaTemplate: KafkaTemplate<String, String>, val applicationProperties: ApplicationProperties) {
@Transactional(transactionManager = "kafkaTransactionManager")
fun sendToKafka(message: String, stringKey: String) {
kafkaTemplate.executeInTransaction { kt ->
kt.send(applicationProperties.kafka.topic, System.currentTimeMillis().mod(10).toInt(), System.currentTimeMillis().rem(10).toString(),
message)
}
}
companion object {
val log = LoggerFactory.getLogger(KafkaSender::class.java)!!
}
}
因为每次我想向kafka发送一条消息时,我都会示例化一个新的kafkasender,我以为会创建一个新线程,然后将消息发送到kafka队列。目前,它看起来像一个生产者池生成,但从来没有清理,即使他们没有任何事做。
这种行为是故意的吗?
在我看来,这种行为应该与数据源池几乎相同,让线程保持一段时间的活动状态,但是当没有什么可做的时候,就把它清除掉。
1条答案
按热度按时间d7v8vwbk1#
使用事务时,生产者缓存会按需增长,而不会减少。
如果您正在侦听器容器(使用者)线程上生成消息;每个主题/分区/使用者组都有一个生产者。这是解决僵尸防护问题所必需的,因此,如果发生重新平衡并且分区移动到不同的示例,事务id将保持不变,以便代理可以正确地处理这种情况。
如果您不关心僵尸围栏问题(并且您可以处理重复交付),请设置
producerPerConsumerPartition
属性设置为falseDefaultKafkaProducerFactory
而生产商的数量将少得多。