我的项目将apachekafka作为一种基于jms的消息传递方法的潜在替代品。为了使这个转换尽可能顺利,如果替换队列系统(kafka)有一个异步订阅机制(类似于我们当前项目使用的jms机制)是理想的 MessageListener
以及 MessageConsumer
订阅主题并接收异步通知。我不太在乎kafka是否严格遵守jmsapi,但是相反地,如果不需要的话,我宁愿不重新设计发布-订阅通知类的整个套件。
我可以找到各种kafkaconsumer pollingexamples,但是到目前为止还没有找到任何客户机通过异步通知收到新消息的示例。
有人知道kafka的当前版本(截至本文发布时为0.10.2)是否提供了这样一个api,或者我一直在尝试使用轮询重写遗留代码吗?
3条答案
按热度按时间w8biq8rn1#
kafka客户端只提供按需池机制,但您可以使用springkafka。它提供
MessageListener
接口和KafkaListener
注解和类似内容。参见文档。alen0pnh2#
如果在消耗完所有可用消息后可以接受一点延迟,可以使用计时器并调用consumer.poll(0),它会立即返回可用消息。在消耗完这些信息之后,您可以使用相同的可接受延迟(比如100ms)再次设置计时器。
当吞吐量较低时,批将很小,这种延迟将更频繁地干预。但是,由于场景是异步的,因此延迟也不是很关键。不管怎样,你永远都不知道消息何时到达。
当生产量很高时,批量就会很大。对于新的kafka使用者,fetch.max.bytes的默认值是52428800。与处理大量消息所需的时间相比,额外的延迟相对较小。
您可以将其封装在一个小组件中,为其提供与当前mbean处理程序相对应的函数。
k3bvogb13#
您可能需要试用融合的kafka jms客户机。
http://docs.confluent.io/3.2.0/clients/kafka-jms-client/docs/index.html
kafka jms客户机是kafka生产者/消费者api之上的jmsapi Package 器,因此它具有所有标准jms1.1接口。这是一个企业(订阅)功能,但如果您下载confluent enterprise,您可以免费试用30天。
https://www.confluent.io/download/