如何暂停Kafka消费者?

wpcxdonn  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(1142)

我在我的框架中使用了Kafka生产者-消费者模型。在消费端消费的记录稍后被索引到elasticsearch上。这里我有一个用例,如果es关闭,我将不得不暂停kafka消费者,直到es打开,一旦它打开,我需要恢复消费者,并从我上次离开的地方消费记录。我不认为这可以通过@kafkalistener实现。谁能给我一个解决办法吗?我发现我需要为此编写自己的kafkalistener容器,但我无法正确地实现它。任何帮助都将不胜感激。

mzmfm0qo

mzmfm0qo1#

我建议您暂停consumer,为什么不能一次又一次地重试同一条消息,并在消息成功消费后提交offset呢。
例如:
为方法添加注解 @Retryable 并用try/catch阻止您的方法,并在catch块中抛出新异常。
对于listenerfactory配置添加属性:

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);
6qfn3psc

6qfn3psc2#

有几种可能的解决方案,一种简单的方法是使用kafkaconsumerapi。在kafkaconsumer中,实现跟踪主题上的位置,该位置将在下次调用poll(…)时检索。你的问题是,当你从Kafka那里得到记录后,你可能无法将它插入ElasticSearch。在这种情况下,您必须编写一个例程来重置使用者的位置,在您的情况下,它将是consumer.seek(partition,consumer.position(partition)-1)。这将使位置复位到先前的位置。此时,一个好的方法是暂停分区(这将使服务器能够进行一些资源清理),然后轮询es(通过您想要的任何机制)。一旦es可用,请致电消费者的resume,然后继续您通常的轮询插入周期。
讨论后编辑
使用指定的生命周期方法创建springbean。在bean的初始化方法中,示例化kafkaconsumer(从任何源检索consumer的配置)。从方法上启动一个线程与消费者交互并更新es,其余的设计都是按照上面的方法进行的。这是一个单一的模型。为了获得更高的吞吐量,请考虑将从kafka检索到的数据保存在内存中的小队列中,并使用一个dispatcher线程来获取消息并将其交给一个池线程来更新消息。

相关问题