我正在尝试用groupid编写一个kafka消费者 foo ,它订阅某个主题并从头开始读取(即使有以前的偏移量)。我试着用 Subscribe 使用重新平衡回调,但似乎从未调用过(已设置 go.application 设置)。有没有什么例子可以证明这一点?编辑:添加更多详细信息
foo
Subscribe
go.application
z4bn682m1#
本例是来自合流Kafkago github,您可能只需要设置 auto.offset.reset 至 kafka.OffsetBeginning.String() :
auto.offset.reset
kafka.OffsetBeginning.String()
package main /** * Copyright 2016 Confluent Inc. */ // consumer_example implements a consumer using the non-channel Poll() API // to retrieve messages and events. import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "os" "os/signal" "syscall" ) func main() { broker := "YOUR_BROKER" group := "YOUR_GROUP" topics := "YOUR_TOPICS" sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": group, "session.timeout.ms": 6000, "auto.offset.reset": kafka.OffsetBeginning.String()}) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) os.Exit(1) } fmt.Printf("Created Consumer %v\n", c) err = c.SubscribeTopics(topics, nil) run := true for run == true { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false default: ev := c.Poll(100) if ev == nil { continue } switch e := ev.(type) { case *kafka.Message: fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) if e.Headers != nil { fmt.Printf("%% Headers: %v\n", e.Headers) } case kafka.Error: // Errors should generally be considered as informational, the client will try to automatically recover fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) default: fmt.Printf("Ignored %v\n", e) } } } fmt.Printf("Closing consumer\n") c.Close() }
gzszwxb42#
我们现在开始设置 enable.auto.commit 至 false . 这样,就不会存储任何偏移量,我们从一开始就可以在每次运行中正常地消耗。
enable.auto.commit
false
2条答案
按热度按时间z4bn682m1#
本例是来自合流Kafkago github,您可能只需要设置
auto.offset.reset
至kafka.OffsetBeginning.String()
:gzszwxb42#
我们现在开始设置
enable.auto.commit
至false
. 这样,就不会存储任何偏移量,我们从一开始就可以在每次运行中正常地消耗。