我在我的项目中使用spring-kafka,因为在基于spring的项目中使用kafka消息似乎是一种自然的选择。要使用消息,我可以使用 MessageListener
接口。斯普林·Kafka在内心里谨慎地召唤着我 onMessage
方法。
但是,在我的设置中,我更喜欢显式地轮询新消息并按顺序处理它们(这将花费几秒钟)。作为一个解决办法,我可能只是阻止我的 onMessage
实现,或在内部缓冲消息。然而,这似乎与Kafka的核心思想背道而驰。
Kafka的设计使得消费者必须投票寻找符合我要求的新消息。有没有一种方法可以利用SpringKafka的“自然”工作流?
我应该避免在这个用例中使用springkafka吗?
这个 KafkaConsumer
文件说明:
对于消息处理时间变化不可预测的用例,这两个选项都不够。处理这些情况的建议方法是将消息处理移到另一个线程,这样就允许使用者在处理器仍在工作时继续调用poll。必须注意确保承诺的抵消不会超过实际头寸。通常,只有在线程处理完记录之后,才必须禁用记录的自动提交和手动提交已处理的偏移量(取决于所需的传递语义)。还要注意,您需要暂停分区,以便在线程处理完之前返回的记录之前,不会从poll接收到新记录。
相关问题:https://github.com/spring-projects/spring-kafka/issues/195
1条答案
按热度按时间shstlldc1#
必须不断轮询消费者的问题现在已经解决了(kip-62在0.10.1.x中),因此这不再是问题(只要您不超过
max.poll.interval.ms
)默认为5分钟,但可以增加。但是,如果您想自己投票,您仍然可以使用spring-kafka(例如,如果您使用boot,就可以获得spring-boot自动配置),但是您可以获得
Consumer
从DefaultKafkaConsumerFactory
以及poll()
它是直接的。