Go语言 使用UTC消费两个主题时的Kafka消费者滞后问题

vohkndzv  于 2023-09-28  发布在  Go
关注(0)|答案(1)|浏览(111)

我在一个消费者组中消费两个主题(主题A和主题B)时遇到了Kafka消费者的问题。这两个主题的数据中都包含UTC时间戳,并同时发布。我面临的问题是,当一个主题被消费时,另一个主题似乎落后了,消费并没有像预期的那样同步发生。
我的目标是采用一种更并行和更高效的消费方法,两个主题在发布时都会被迅速处理。如何解决这个问题并优化这两个主题的使用?是否有我应该考虑的最佳做法或配置调整?
我正在为我的Kafka消费者使用Golang Sarama库。以下是我目前如何设置Kafka消费者的一个片段:

package kafka

import (
    "context"
    "os"
    "os/signal"
    "strings"
    "sync"
    "syscall"
    "github.com/Shopify/sarama"
)

type kafka_consumer_group struct {
    consumer ConsumerBase
}

var kafka_controller = controllers.NewKafkaController()

func Init(group map[string]interface{}) {
    cgo := &kafka_consumer_group{
        consumer: ConsumerBase{
            brokers:      strings.Split(group["brokers"].(string), ","),
            topics:       strings.Split(group["topics"].(string), ","),
            assignor:     group["assignor"].(string),
            version:      group["version"].(string),
            ready:        make(chan bool),
            group:        group["name"].(string),
            offsetNewest: isOffsetNewest(group["offset"].(string)),
            offsetOldest: isOffsetOldest(group["offset"].(string)),
        },
    }
    cgo.init()
}

func (kafka_consumer *kafka_consumer_group) init() {
    ctx := context.Background()
    zap.Debug(ctx, "kafka_cg:", "Starting a new Sarama consumer for :", kafka_consumer.consumer.group)
    version, err := sarama.ParseKafkaVersion(kafka_consumer.consumer.version)
    if err != nil {
        zap.Fatal(ctx, "kafka_cg", "Error parsing Kafka version:", err)
    }
    config := sarama.NewConfig()
    config.Version = version
    switch kafka_consumer.consumer.assignor {
    case "sticky":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
    case "roundrobin":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    case "range":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    default:
        zap.Fatal(ctx, "kafka_cg", "Unrecognized consumer group partition assignor: ", kafka_consumer.consumer.assignor)
    }
    if kafka_consumer.consumer.offsetNewest {
        config.Consumer.Offsets.Initial = sarama.OffsetNewest
    }
    if kafka_consumer.consumer.offsetOldest {
        config.Consumer.Offsets.Initial = sarama.OffsetOldest
    }
    ctx, cancel := context.WithCancel(context.Background())
    client, err := sarama.NewConsumerGroup(kafka_consumer.consumer.brokers, kafka_consumer.consumer.group, config)
    if err != nil {
        zap.Fatal(ctx, "kafka_cg", "Error creating consumer group client:", err)
    }
    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            // `Consume` should be called inside an infinite loop, when a
            // server-side rebalance happens, the consumer session will need to be
            // recreated to get the new claims
            if err := client.Consume(ctx, kafka_consumer.consumer.topics, kafka_consumer); err != nil {
                zap.Error(ctx, "kafka_cg:Error from consumer: ", err)
            }
            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {
                zap.Error(ctx, "kafka_cg:consumer context cancelled: ", err)
                return
            }
            kafka_consumer.consumer.ready = make(chan bool)
        }
    }()
    <-kafka_consumer.consumer.ready // Await till the consumer has been set up
    zap.Debug(ctx, "kafka_cg", "Sarama consumer up and running!... for", "consumer_group:", kafka_consumer.consumer.group, "with_topics:", kafka_consumer.consumer.topics)
    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-ctx.Done():
        zap.Debug(ctx, "kafka_cg", "terminating: context cancelled")
    case <-sigterm:
        zap.Debug(ctx, "kafka_cg", "terminating: via signal")
    }
    cancel()
    wg.Wait()
    if err = client.Close(); err != nil {
        zap.Fatal(ctx, "kafka_cg", "Error closing client: ", err)
    }
    zap.Debug(ctx, "kafka_cg", "consumer group", kafka_consumer.consumer.group, "closed successfully")
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (kafka_consumer *kafka_consumer_group) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(kafka_consumer.consumer.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (kafka_consumer *kafka_consumer_group) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (kafka_consumer *kafka_consumer_group) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    
    ctx := tracer.WithTraceableContext(context.Background(), "kafka_cg: ConsumeClaim")

    // NOTE:
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
    for message := range claim.Messages() {
        // fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
        kafka_controller.ProcessMessage(ctx, message.Topic, string(message.Value))
        session.MarkMessage(message, "")
        session.Commit()
    }

    return nil
}

任何见解,建议,或如何实现这两个主题的并行消费与同步UTC时间戳在Golang Sarama的例子将不胜感激。

wbgh16ku

wbgh16ku1#

你可以尝试新的两个不同的消费者组与不同的groupID。

相关问题