我正在构建一个Go应用程序,它从Kafka主题中读取消息并将其写入PostgreSQL数据库。
我设置了一个循环,它使用kafka.Reader从Kafka读取消息,并使用sql.DB将它们插入数据库。如果在阅读消息或将其插入数据库时出错,我会记录错误并继续处理下一条消息。
但是,我不确定如何处理在将数据插入PostgreSQL数据库后提交Kafka消息时发生的错误。具体来说,如果手动提交导致错误,我应该怎么做?我应该重试提交操作吗?我应该记录错误并继续下一条消息吗?处理这些类型错误的最佳实践是什么?
for {
kafkaMessage, err := kafkaReader.ReadMessage(context.Background())
if err != nil {
fmt.Printf("Failed to read message from Kafka: %s\n", err)
continue
}
_, err = db.Exec("INSERT INTO mytable (payload) VALUES ($1)", kafkaMessage.Value)
if err != nil {
fmt.Printf("Failed to insert payload into database: %s\n", err)
continue
}
// What should I do if the commit operation fails?
err = kafkaReader.CommitMessages(context.Background(), kafkaMessage)
if err != nil {
// What's the best practice for handling this error?
}
}
字符串
2条答案
按热度按时间flvlnr441#
当遇到错误时,只需将
continue
循环到for
循环的下一次迭代。如果由于任何原因未能提交Kafka消息,Kafka将在下一个
reader.ReadMessage(ctx)
中再次返回相同的消息。但是要确保你的代码不会继续徒劳地做同样失败的工作很多次,耗尽你的资源,用同样的错误消息淹没日志,等等。在每个错误后使用简单的
sleep
,或者如果确实需要,为您的功能使用断路器逻辑。字符串
ymdaylpp2#
一般来说,业务逻辑应该依赖于数据最后一级的一致性。对于您的情况,我假设数据将在数据库中持久化,这是唯一重要的事情,那么您应该围绕数据库设计一个一致性模型,找到数据属性并设计一个适当的业务流程,以帮助您保证数据库的最终一致性。
对于你的问题,你是否处理错误真的不重要,在Kafka发生了一些意想不到的事情,你忽略它或者什么,如果提交(实际上)成功,Kafka应该继续下一条消息,或者如果提交(实际上)失败,停留在当前偏移量。这就是所谓的
at-least-once
传递,因此您的业务逻辑应该正确地处理重复的消息。如果您想要
exactly-once
交付,那就麻烦多了,我不建议业务逻辑依赖于精确一次交付。例如,如果数据库插入了记录,但当响应传输回应用程序时网络失败,会发生什么?您会假设网络在数据库提交之后还是在数据库提交之前丢失?当你的请求到达Kafka服务器时,Kafka可以保证只传递一次,但是在Kafka服务器的范围之外,它不能帮助你太多。