spring kafka kafkatemplate.flush()吗?

pu3pd22g  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(566)

我是第一次使用springkafka,我用springkafka创建了一个生产者和消费者。我的kafka服务器运行在localhost上,并创建了一个名为test的主题。我无法通过简单的电话向消费者发送信息

KafkaTemplate.send(topicName,Data);

在同一个对象上调用send之后,我不得不在kafkatemplate上调用flush(),然后消费者就可以接收数据了。好吧,它很管用,非常棒。但有谁能向我解释一下幕后发生了什么?为什么需要调用flush方法。
来自官方的 Spring Kafka文件。

public void flush()

冲洗生产商。注意,只有当producerfactory提供单例生产者(例如defaultkafkaproducerfactory)时,调用此方法才有意义。
先谢谢你。

mgdq6dx1

mgdq6dx11#

producer的实现是异步的。消息存储在内部队列中,等待内部线程发送,这将通过潜在的批处理提高效率。
因此,当程序退出时,消息可能会留在客户机的内存中。在本例中,kafka服务器实际上并不接收这些消息。
消息将在由定义的超时时间内发送 queue.buffering.max.ms ,或其他大小/数量限制。 flush 强制将发送队列中的所有消息传递到服务器。

相关问题