在我目前的项目中,我需要使用Kafka来发送消息。我目前的实现方法如下:`
class KafkaHelper(metaclass=Singleton):
def __init__(self):
self.topic = global_config.getKafkaTopic()
self.brokers = global_config.getKafkaBrokers()
def send_msg(self, value):
try:
start_time = time.time() * 1000
producer = KafkaProducer(bootstrap_servers=self.brokers)
print(producer)
# import pdb
# pdb.set_trace()
producer.send(self.topic, str.encode(json.dumps(value)))
except Exception as e:
end_time = time.time()*1000
logger.info('Kafka send massage, failed, cost:%fms, error:%s',end_time-start_time, str(e))
finally:
producer.close()
`
如果这样写,每次发送消息时,都会建立连接,然后完成传输。最后,连接将被关闭,这导致显著的成本。我们能不能在项目启动时初始化一次,创建一个对象,每次都使用这个初始化的对象,这样就不需要每次都建立连接、发送消息、关闭连接。
expect:可以在项目运行时初始化一次,每次发送消息时使用这个初始化的对象
1条答案
按热度按时间dced5bon1#
Kafka生产者是线程安全的。是的,您可以在应用程序的整个生命周期中重用同一个示例,甚至可以为多个主题重用同一个示例(假设您希望为所有操作使用相同的生产者配置;例如,某些主题可能使用JSON,而其他主题使用不同的格式)