我在一个消费者组中消费两个主题(主题A和主题B)时遇到了Kafka消费者的问题。这两个主题的数据中都包含UTC时间戳,并同时发布。我面临的问题是,当一个主题被消费时,另一个主题似乎落后了,消费并没有像预期的那样同步发生。
我的目标是采用一种更并行和更高效的消费方法,两个主题在发布时都会被迅速处理。如何解决这个问题并优化这两个主题的使用?是否有我应该考虑的最佳做法或配置调整?
我正在为我的Kafka消费者使用Golang Sarama库。以下是我目前如何设置Kafka消费者的一个片段:
package kafka
import (
"context"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
type kafka_consumer_group struct {
consumer ConsumerBase
}
var kafka_controller = controllers.NewKafkaController()
func Init(group map[string]interface{}) {
cgo := &kafka_consumer_group{
consumer: ConsumerBase{
brokers: strings.Split(group["brokers"].(string), ","),
topics: strings.Split(group["topics"].(string), ","),
assignor: group["assignor"].(string),
version: group["version"].(string),
ready: make(chan bool),
group: group["name"].(string),
offsetNewest: isOffsetNewest(group["offset"].(string)),
offsetOldest: isOffsetOldest(group["offset"].(string)),
},
}
cgo.init()
}
func (kafka_consumer *kafka_consumer_group) init() {
ctx := context.Background()
zap.Debug(ctx, "kafka_cg:", "Starting a new Sarama consumer for :", kafka_consumer.consumer.group)
version, err := sarama.ParseKafkaVersion(kafka_consumer.consumer.version)
if err != nil {
zap.Fatal(ctx, "kafka_cg", "Error parsing Kafka version:", err)
}
config := sarama.NewConfig()
config.Version = version
switch kafka_consumer.consumer.assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
zap.Fatal(ctx, "kafka_cg", "Unrecognized consumer group partition assignor: ", kafka_consumer.consumer.assignor)
}
if kafka_consumer.consumer.offsetNewest {
config.Consumer.Offsets.Initial = sarama.OffsetNewest
}
if kafka_consumer.consumer.offsetOldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(kafka_consumer.consumer.brokers, kafka_consumer.consumer.group, config)
if err != nil {
zap.Fatal(ctx, "kafka_cg", "Error creating consumer group client:", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, kafka_consumer.consumer.topics, kafka_consumer); err != nil {
zap.Error(ctx, "kafka_cg:Error from consumer: ", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
zap.Error(ctx, "kafka_cg:consumer context cancelled: ", err)
return
}
kafka_consumer.consumer.ready = make(chan bool)
}
}()
<-kafka_consumer.consumer.ready // Await till the consumer has been set up
zap.Debug(ctx, "kafka_cg", "Sarama consumer up and running!... for", "consumer_group:", kafka_consumer.consumer.group, "with_topics:", kafka_consumer.consumer.topics)
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
zap.Debug(ctx, "kafka_cg", "terminating: context cancelled")
case <-sigterm:
zap.Debug(ctx, "kafka_cg", "terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
zap.Fatal(ctx, "kafka_cg", "Error closing client: ", err)
}
zap.Debug(ctx, "kafka_cg", "consumer group", kafka_consumer.consumer.group, "closed successfully")
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (kafka_consumer *kafka_consumer_group) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(kafka_consumer.consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (kafka_consumer *kafka_consumer_group) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (kafka_consumer *kafka_consumer_group) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := tracer.WithTraceableContext(context.Background(), "kafka_cg: ConsumeClaim")
// NOTE:
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
// fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
kafka_controller.ProcessMessage(ctx, message.Topic, string(message.Value))
session.MarkMessage(message, "")
session.Commit()
}
return nil
}
任何见解,建议,或如何实现这两个主题的并行消费与同步UTC时间戳在Golang Sarama的例子将不胜感激。
1条答案
按热度按时间wbgh16ku1#
你可以尝试新的两个不同的消费者组与不同的groupID。