Go语言 无法关闭批处理:[7]请求超时:请求超出了用户在请求中指定的时间限制

vxf3dgd4  于 2023-09-28  发布在  Go
关注(0)|答案(1)|浏览(105)

这是我第一次使用Kafka-go。我在一台干净的机器上本地安装了所有东西,并使用了前两个代码示例:产生消息和使用消息如下。请注意,我只是从上述链接复制粘贴代码。

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

func main() {

    ProduceMessages()
    ConsumeMessages()
}

func ProduceMessages() {
    // to produce messages
    topic := "my-topic"
    partition := 0

    conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
    if err != nil {
        log.Fatal("failed to dial leader:", err)
    }

    conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
    _, err = conn.WriteMessages(
        kafka.Message{Value: []byte("one!")},
        kafka.Message{Value: []byte("two!")},
        kafka.Message{Value: []byte("three!")},
    )
    if err != nil {
        log.Fatal("failed to write messages:", err)
    }

    if err := conn.Close(); err != nil {
        log.Fatal("failed to close writer:", err)
    }
}

func ConsumeMessages() {
    // to consume messages
    topic := "my-topic"
    partition := 0

    conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
    if err != nil {
        log.Fatal("failed to dial leader:", err)
    }

    conn.SetReadDeadline(time.Now().Add(10 * time.Second))
    batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

    b := make([]byte, 10e3) // 10KB max per message
    for {
        n, err := batch.Read(b)
        if err != nil {
            break
        }
        fmt.Println(string(b[:n]))
    }

    if err := batch.Close(); err != nil {
        log.Fatal("failed to close batch:", err)
    }

    if err := conn.Close(); err != nil {
        log.Fatal("failed to close connection:", err)
    }
}

然而,每当试图关闭批处理会话时,我都会得到以下结果(在等待了几秒钟之后):

one!
two!
three!

到目前为止,这很好,但这总是伴随着以下错误。

2022/12/10 10:35:50 failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request
exit status 1

为什么无法关闭批处理(为什么超时)?
PS>请注意,我无法添加与现有confluent-kafka-go标记不同的Kafka-go

zbq4xfa0

zbq4xfa01#

出现此错误的原因是您的batch的大小小于10kb。尝试减少此行中第一个参数的值:

batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

相关问题