关于Kafka连接池的问题

zqdjd7g9  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(152)

在我目前的项目中,我需要使用Kafka来发送消息。我目前的实现方法如下:`

class KafkaHelper(metaclass=Singleton):
    def __init__(self):
        self.topic = global_config.getKafkaTopic()
        self.brokers = global_config.getKafkaBrokers()

    def send_msg(self, value):
        try:
            start_time = time.time() * 1000
            producer = KafkaProducer(bootstrap_servers=self.brokers)
            print(producer)
            # import pdb
            # pdb.set_trace()
            producer.send(self.topic, str.encode(json.dumps(value)))
        except Exception as e:
            end_time = time.time()*1000
            logger.info('Kafka send massage, failed, cost:%fms, error:%s',end_time-start_time, str(e))
        finally:
            producer.close()

`
如果这样写,每次发送消息时,都会建立连接,然后完成传输。最后,连接将被关闭,这导致显著的成本。我们能不能在项目启动时初始化一次,创建一个对象,每次都使用这个初始化的对象,这样就不需要每次都建立连接、发送消息、关闭连接。
expect:可以在项目运行时初始化一次,每次发送消息时使用这个初始化的对象

dced5bon

dced5bon1#

Kafka生产者是线程安全的。是的,您可以在应用程序的整个生命周期中重用同一个示例,甚至可以为多个主题重用同一个示例(假设您希望为所有操作使用相同的生产者配置;例如,某些主题可能使用JSON,而其他主题使用不同的格式)

相关问题