sarama生产压缩主题不压缩

wj8zmpe1  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(442)

高级别问题
我在本地运行Kafka,我使用压缩的主题。当我运行命令行producer和consumer时,我可以验证是否正在进行压缩,但是当我使用sarama(“github.com/shopify/sarama”)producer时,似乎不会进行日志压缩。
验证日志压缩
首先,我使用以下命令创建了一个主题:

bin/kafka-topics.sh --zookeeper localhost:2181 \
  --create --topic andrew.topic \
  --config "cleanup.policy=compact" \
  --config "delete.retention.ms=100" \
  --config "segment.ms=100" \
  --config "min.cleanable.dirty.ratio=0.01" \
  --partitions 1 \
  --replication-factor 1

接下来,我将使用以下方法向它生成几个消息:

for i in $(seq 0 10); do \
  echo "sameKey123:differentMessage$i" | bin/kafka-console-producer.sh \
  --broker-list localhost:9091 \
  --topic andrew.topic \
  --property "parse.key=true" \
  --property "key.separator=:"; \
done

最后验证是否发生了日志压缩:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 \
  --topic andrew.topic \
  --property print.key=true \
  --property key.separator=" : " \
  --from-beginning

打印内容:

sameKey123 : differentMessage9
sameKey123 : differentMessage10

因此,andrew.topic主题的日志压缩正在发生。
现在用萨拉玛
现在,我使用sarama生成指向同一主题的消息,如下所示:

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
    "os"
    "os/signal"
)

func main() {
    sendMessages()

}

func sendMessages() {
    producer, err := sarama.NewSyncProducer([]string{"localhost:9091"}, nil)
    if err != nil {
        panic(err)
    }
    defer func() {
        if err := producer.Close(); err != nil {
            panic(err)
        }
    }()

    for i := 0; i <= 10; i++ {
        pm := &sarama.ProducerMessage{
            Topic: "andrew.topic",
            Key:   sarama.StringEncoder("sameSaramaKey123"),
            Value: sarama.StringEncoder(fmt.Sprintf("differentMessage%v", i)),
        }
        _, _, err := producer.SendMessage(pm)
        if err != nil {
            panic(err)
        }
    }
}

在命令行重新启动使用者之后,我看到以下输出

sameKey123 : differentMessage9
sameKey123 : differentMessage10
sameSaramaKey123 : differentMessage0
sameSaramaKey123 : differentMessage1
sameSaramaKey123 : differentMessage2
sameSaramaKey123 : differentMessage3
sameSaramaKey123 : differentMessage4
sameSaramaKey123 : differentMessage5
sameSaramaKey123 : differentMessage6
sameSaramaKey123 : differentMessage7
sameSaramaKey123 : differentMessage8
sameSaramaKey123 : differentMessage9
sameSaramaKey123 : differentMessage10

原木在这里没有压实。无论我重新启动消费者多少次,或者用sarama日志压缩生成多少条消息,似乎都不会发生。
更奇怪
如果在用sarama生成消息之后,我在命令行生成更多的消息,那么就会发生日志压缩
在运行terminal producer和sarama producer之后,我得到以下输出

sameSaramaKey123 : differentMessage10
sameKey123 : differentMessage9
sameKey123 : differentMessage10

在终端运行producer之后,所有消息(包括sarama以前生成的消息)都会进行日志压缩。
为什么会这样?我该怎么修?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题