Kafka消费群体失去未承诺的消息

inn6fuwd  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(398)

我使用的消费者组只有一个消费者,只有一个经纪人(docker wurstmeister图像)。在代码中决定是否提交偏移量—如果代码返回错误,则不提交消息。我需要确保系统不会丢失任何消息-即使这意味着永远重试相同的消息(目前;)。为了测试这一点,我创建了一个简单的处理程序,在“error”字符串作为消息发送给kafka的情况下,它不会提交offset。所有其他字符串都已提交。

kafka-console-producer --broker-list localhost:9092 --topic test
>this will be commited

正在运行

kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe

退货

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                           0          13              13              0

所以没关系,没有滞后。现在我们传递'error'字符串来假装发生了不好的事情,并且消息没有提交:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                           0          13              14              1

当前偏移量保持在正确位置+有1条滞后消息。现在,如果我们再次传递正确的消息,偏移量将移到15: TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG test 0 15 15 第14条消息不会再收到了。是违约行为吗?我是否需要跟踪最后一个偏移量并通过它+1手动加载消息?我已经将commit interval设置为0,希望不会使用任何auto.commit机制。
获取/提交代码:

go func() {
    for {
        ctx := context.Background()

        m, err := mr.brokerReader.FetchMessage(ctx)
        if err != nil {
            break
        }

        if err := msgFunc(m); err != nil {
            log.Errorf("# messaging # cannot commit a message: %v", err)
            continue
        }

        // commit message if no error
        if err := mr.brokerReader.CommitMessages(ctx, m); err != nil {
            // should we do something else to just logging not committed message?
            log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err)
        }
    }
}()

读卡器配置:

kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers:         brokers,
GroupID:         groupID,
Topic:           topic,
CommitInterval:  0,
MinBytes:        10e3,
MaxBytes:        10e6,
})

使用的库:https://github.com/segmentio/kafka-go

wfsdck30

wfsdck301#

在这里有必要了解消费者抵消的概念。对于运行消费者应用程序,它将消耗的消息的偏移量存储在内存中,而不考虑提交/取消提交偏移量,如果重新启动消费者应用程序,它将检索“当前偏移量”的偏移量以继续消耗。

xmakbtuz

xmakbtuz2#

看起来您的kafka使用者已设置为自动提交偏移量(这是默认设置)。
如果是这样的话,这可能就是为什么你的应用程序会跳过错误的消息——尽管你跳过了 CommitMessages 调用时,在后台线程上执行提交
请退房 enable.auto.commit 文件中的属性说明:https://kafka.apache.org/documentation/#newconsumerconfigs

erhoui1w

erhoui1w3#

在Kafka,你只需提交一条信息,而不是一条信息。如果我理解你的代码正确(不是一个围棋开发人员)。你只需在收到无效消息后继续。如果在无效的消息后出现一个有效的,你会再次提交抵消-我猜这不是你的意图。
只是想弄清楚提交或提交偏移量意味着什么:您的消费群体将把偏移量存储到一个专用的内部Kafka主题(或者在zookeeper上的旧Kafka版本上)。偏移量可以标识一个主题中的一个位置(或者更准确地说,在给定主题的分区上)。这意味着您只能以线性方式使用主题。
在这里,您可以看到Kafka消费端发生了什么:

您正在使用(很可能是多个)消息堆栈中的消息。您提交此主题/分区的位置(即偏移量)。所以你不能说我想再重复一次特定的信息。你能做的是一旦你发现一条无效的信息就停止消费。在这种情况下,你的问题将是:我如何摆脱这个消息。从Kafka主题中删除一条消息是很棘手的。一种常见的模式是将此消息写入某种死信主题,并与其他消费者进行处理。
希望这能让你更清楚一点。

相关问题