因此,我尝试将kafka用于我的应用程序,其中生产者将操作记录到kafka mq中,消费者从mq中读取操作。由于我的应用程序处于go中,因此我使用shopify sarama使这成为可能。
现在,我可以读取mq并使用
fmt.Printf
然而,我真的希望错误处理比控制台打印更好,我愿意付出额外的努力。
用户连接的当前代码:
mqCfg := sarama.NewConfig()
master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
panic(err) // Don't want to panic when error occurs, instead handle it
}
以及信息处理:
go func() {
defer wg.Done()
for message := range consumer.Messages() {
var msgContent Message
_ = json.Unmarshal(message.Value, &msgContent)
fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it
}
}()
我的问题(我不熟悉测试Kafka,也不熟悉一般的Kafka):
在上面的程序中哪里会发生错误,以便我可以处理它们?任何样本代码将是伟大的我开始。我能想到的错误情况是当msgcontent在json中实际上不包含任何类型的contentid字段时。
在kafka中,是否存在这样的情况:消费者试图读取当前偏移量,但由于某种原因无法读取(即使json格式良好)?我的消费者是否可以回溯到在失败的偏移量读取上说x步,然后重新处理偏移量?还是有更好的办法?再说一遍,这些情况会是什么?
我乐于阅读和尝试。
1条答案
按热度按时间kb5ga3dv1#
关于1)检查我在下面记录错误消息的位置。这或多或少是我会做的。
关于2)我不知道如何在一个主题上后退。它很有可能只是一次又一次地创建一个消费者,每次它的起始偏移量减一。但我不建议这样做,因为你很可能会一遍又一遍地重播同样的信息。我建议你经常保存你的补偿,这样如果情况不好你就可以恢复。
下面是我认为可以解决大多数问题的代码。我没试过编译这个。SaramaAPI最近一直在变化,所以api目前可能有点不同。