当生成给kafka的消息时,可能会出现两种错误:可重试和不可重试。在处理它们时,您应该如何区分它们?
我想异步生成记录,将其中的记录保存在另一个主题(或hbase)中 callback object
接收一个不可重试的异常,并让生产者为我处理所有接收到可重试异常的异常(最多尝试次数,当它最终到达时,将成为第一个尝试)。
我的问题是:生产商是否仍然会自己处理可检索的异常,尽管 callback object
? 因为在接口回调中说:
可重试异常(暂时的,可以通过增加#重试次数来覆盖)
代码可能是这样的吗?
producer.send(record, callback)
def callback: Callback = new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if(null != e) {
if (e == RecordTooLargeException || e == UnknownServerException || ..) {
log.error("Winter is comming")
writeDiscardRecordsToSomewhereElse
} else {
log.warn("It's no that cold") //it's retriable. The producer will keep trying by itself?
}
} else {
log.debug("It's summer. Everything is fine")
}
}
}
Kafka版本:0.10.0
任何光线都会很好!:)
1条答案
按热度按时间fruv7luv1#
正如Kafka圣经(又称Kafka权威指南)所说:
缺点是,虽然commitsync()将重试提交,直到提交成功或遇到不可重试的失败,但commitsync()不会重试。
原因是:
它不会重试,因为当commitasync()从服务器接收到响应时,可能已经有一个稍后的提交已经成功了。
假设我们发送了一个提交偏移量2000的请求。有一个临时通信问题,所以代理永远不会得到请求,因此永远不会响应。同时,我们处理了另一批并成功提交了3000。如果commita sync()现在重试以前失败的提交,那么在处理并提交了偏移量3000之后,它可能会成功提交偏移量2000。在重新平衡的情况下,这将导致更多的重复。
除此之外,您还可以创建一个递增的序列号,您可以在每次提交时递增该序列号,并将该序列号添加到回调对象中。当重试时间到来时,只需检查acc的当前值是否等于您给回调的数字。如果是这样,那么它是安全的,您可以执行提交。否则,会有一个新的提交,您不应该重试此偏移量的提交。
这似乎有很多麻烦,这是因为如果你正在考虑这个问题,你应该改变你的策略。