我正在开发微服务,它使用来自kaffka的消息,然后处理这些消息并将输出存储到mongodb
我是新来Kafka,我遇到了一些问题,失去信息。
场景非常简单:
如果mongodb离线,microservice接收到一条消息,然后试图将输出保存到mongo,那么我得到一个错误,说mongo离线,消息丢失。
我的问题是,在这种情况下,Kafka有没有停止发送信息的机制。应该在kafka中手动提交offset吗?处理Kafka消费者错误的最佳实践是什么?
我正在开发微服务,它使用来自kaffka的消息,然后处理这些消息并将输出存储到mongodb
我是新来Kafka,我遇到了一些问题,失去信息。
场景非常简单:
如果mongodb离线,microservice接收到一条消息,然后试图将输出保存到mongo,那么我得到一个错误,说mongo离线,消息丢失。
我的问题是,在这种情况下,Kafka有没有停止发送信息的机制。应该在kafka中手动提交offset吗?处理Kafka消费者错误的最佳实践是什么?
3条答案
按热度按时间yfwxisqw1#
我认为与其手动提交,不如使用kafka流和kafka连接。管理两个系统之间的事务:ApacheKafka和mongodb可能不是那么容易,因此更好地使用已经开发和测试的工具(您可以阅读更多关于kafka connect的信息:https://kafka.apache.org/documentation/#connect, https://docs.confluent.io/current/connect/index.html)
您的场景可能是这样的:
使用kafka流处理您的消息并将结果发送到新主题(kafka流只支持一次语义)
使用kafka connect(sink connector)在mongodb中保存数据https://www.confluent.io/connector/kafka-connect-mongodb-sink/
qoefvg9y2#
对于这种情况,您应该手动提交偏移量。仅当消息处理成功时提交偏移量。你像下面这样做。但是,您应该注意到消息具有ttl,因此在ttl过去之后,消息会自动从kafka代理中删除。
ss2ws0br3#
一种方法是使用
pause
以及resume
上的方法MessageListenerContainer
(但必须使用SpringKafka>2.1.x)SpringKafka文档@kafkalistener生命周期管理
为创建的侦听器容器
@KafkaListener
注解不是应用程序上下文中的bean。相反,它们是用类型为的基础结构bean注册的KafkaListenerEndpointRegistry
. 这个bean由框架自动声明并管理容器的生命周期;它将自动启动任何有autoStartup
设置为true
.所以自动连线
KafkaListenerEndpointRegistry
应用程序中的注册表终结点去拿那个
MessageListenerContainer
从注册 Spring Kafka文件返回具有指定id的messagelistenercontainer,如果不存在此类容器,则返回null。
参数:
id—容器的id
在
MessageListenerContainer
你可以用pause
或者resume
方法 Spring Kafka默认void pause()
在下次轮询()之前暂停此容器。
默认void resume()
如果暂停,请在下次轮询()后恢复此容器。