我是第一次使用springkafka,我用springkafka创建了一个生产者和消费者。我的kafka服务器运行在localhost上,并创建了一个名为test的主题。我无法通过简单的电话向消费者发送信息
KafkaTemplate.send(topicName,Data);
在同一个对象上调用send之后,我不得不在kafkatemplate上调用flush(),然后消费者就可以接收数据了。好吧,它很管用,非常棒。但有谁能向我解释一下幕后发生了什么?为什么需要调用flush方法。
来自官方的 Spring Kafka文件。
public void flush()
冲洗生产商。注意,只有当producerfactory提供单例生产者(例如defaultkafkaproducerfactory)时,调用此方法才有意义。
先谢谢你。
1条答案
按热度按时间mgdq6dx11#
producer的实现是异步的。消息存储在内部队列中,等待内部线程发送,这将通过潜在的批处理提高效率。
因此,当程序退出时,消息可能会留在客户机的内存中。在本例中,kafka服务器实际上并不接收这些消息。
消息将在由定义的超时时间内发送
queue.buffering.max.ms
,或其他大小/数量限制。flush
强制将发送队列中的所有消息传递到服务器。