Kafka生产者线程,大量的线程,即使没有消息发送

bwntbbo3  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(562)

我目前分析了我的kafka producer spring boot应用程序,发现许多“kafka producer network thread”正在运行(总共47个)。它永远不会停止运行,即使没有数据发送。我的应用程序看起来有点像这样:

var kafkaSender = KafkaSender(kafkaTemplate, applicationProperties)
kafkaSender.sendToKafka(json, rs.getString("KEY"))

与Kafka森德:

@Service
class KafkaSender(val kafkaTemplate: KafkaTemplate<String, String>, val applicationProperties: ApplicationProperties) {

@Transactional(transactionManager = "kafkaTransactionManager")
fun sendToKafka(message: String, stringKey: String) {
   kafkaTemplate.executeInTransaction { kt ->
       kt.send(applicationProperties.kafka.topic, System.currentTimeMillis().mod(10).toInt(), System.currentTimeMillis().rem(10).toString(),
               message)
   }
}

companion object {
    val log = LoggerFactory.getLogger(KafkaSender::class.java)!!
}
}

因为每次我想向kafka发送一条消息时,我都会示例化一个新的kafkasender,我以为会创建一个新线程,然后将消息发送到kafka队列。目前,它看起来像一个生产者池生成,但从来没有清理,即使他们没有任何事做。
这种行为是故意的吗?
在我看来,这种行为应该与数据源池几乎相同,让线程保持一段时间的活动状态,但是当没有什么可做的时候,就把它清除掉。

d7v8vwbk

d7v8vwbk1#

使用事务时,生产者缓存会按需增长,而不会减少。
如果您正在侦听器容器(使用者)线程上生成消息;每个主题/分区/使用者组都有一个生产者。这是解决僵尸防护问题所必需的,因此,如果发生重新平衡并且分区移动到不同的示例,事务id将保持不变,以便代理可以正确地处理这种情况。
如果您不关心僵尸围栏问题(并且您可以处理重复交付),请设置 producerPerConsumerPartition 属性设置为false DefaultKafkaProducerFactory 而生产商的数量将少得多。

相关问题