如何消费Kafka主题在多个goroutines,使用萨拉玛?

igetnqfo  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(651)

我用https://github.com/shopify/sarama 与Kafka的互动。我有一个主题,例如,100个分区。我有一个应用程序,部署在一台主机上。所以,我想在多个goroutine中使用这个主题。
我看到这个例子-https://github.com/shopify/sarama/blob/master/examples/consumergroup/main.go ,其中我们可以看到,如何在特定的消费群体中创建消费者。
所以,我的问题是,我应该创造成倍的这样的消费者,还是有一些设置 Sarama ,在那里我可以设置所需数量的用户goroutine。
p、 我看到这个问题-https://github.com/shopify/sarama/issues/140 -但没有答案,如何创造 MultiConsumer .

pvcm50d1

pvcm50d11#

此示例显示了一个完全工作的控制台应用程序,该应用程序可以为主题中的所有分区使用,并为每个分区创建一个goroutine:
https://github.com/shopify/sarama/blob/master/tools/kafka-console-consumer/kafka-console-consumer.go
它链接在你在问题中发布的帖子的末尾。
它基本上创造了一个消费者:

c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)

然后获取所需主题的所有分区:

func getPartitions(c sarama.Consumer) ([]int32, error) {
    if *partitions == "all" {
        return c.Partitions(*topic)
    }
...

然后为每个分区创建一个partitionconsumer,并使用不同goroutine中的每个分区:

for _, partition := range partitionList {
    pc, err := c.ConsumePartition(*topic, partition, initialOffset)
    ....

    wg.Add(1)
    go func(pc sarama.PartitionConsumer) {
        defer wg.Done()
        for message := range pc.Messages() {
            messages <- message
        }
    }(pc)

}

相关问题