高级别问题
我在本地运行Kafka,我使用压缩的主题。当我运行命令行producer和consumer时,我可以验证是否正在进行压缩,但是当我使用sarama(“github.com/shopify/sarama”)producer时,似乎不会进行日志压缩。
验证日志压缩
首先,我使用以下命令创建了一个主题:
bin/kafka-topics.sh --zookeeper localhost:2181 \
--create --topic andrew.topic \
--config "cleanup.policy=compact" \
--config "delete.retention.ms=100" \
--config "segment.ms=100" \
--config "min.cleanable.dirty.ratio=0.01" \
--partitions 1 \
--replication-factor 1
接下来,我将使用以下方法向它生成几个消息:
for i in $(seq 0 10); do \
echo "sameKey123:differentMessage$i" | bin/kafka-console-producer.sh \
--broker-list localhost:9091 \
--topic andrew.topic \
--property "parse.key=true" \
--property "key.separator=:"; \
done
最后验证是否发生了日志压缩:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 \
--topic andrew.topic \
--property print.key=true \
--property key.separator=" : " \
--from-beginning
打印内容:
sameKey123 : differentMessage9
sameKey123 : differentMessage10
因此,andrew.topic主题的日志压缩正在发生。
现在用萨拉玛
现在,我使用sarama生成指向同一主题的消息,如下所示:
package main
import (
"fmt"
"github.com/Shopify/sarama"
"os"
"os/signal"
)
func main() {
sendMessages()
}
func sendMessages() {
producer, err := sarama.NewSyncProducer([]string{"localhost:9091"}, nil)
if err != nil {
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
panic(err)
}
}()
for i := 0; i <= 10; i++ {
pm := &sarama.ProducerMessage{
Topic: "andrew.topic",
Key: sarama.StringEncoder("sameSaramaKey123"),
Value: sarama.StringEncoder(fmt.Sprintf("differentMessage%v", i)),
}
_, _, err := producer.SendMessage(pm)
if err != nil {
panic(err)
}
}
}
在命令行重新启动使用者之后,我看到以下输出
sameKey123 : differentMessage9
sameKey123 : differentMessage10
sameSaramaKey123 : differentMessage0
sameSaramaKey123 : differentMessage1
sameSaramaKey123 : differentMessage2
sameSaramaKey123 : differentMessage3
sameSaramaKey123 : differentMessage4
sameSaramaKey123 : differentMessage5
sameSaramaKey123 : differentMessage6
sameSaramaKey123 : differentMessage7
sameSaramaKey123 : differentMessage8
sameSaramaKey123 : differentMessage9
sameSaramaKey123 : differentMessage10
原木在这里没有压实。无论我重新启动消费者多少次,或者用sarama日志压缩生成多少条消息,似乎都不会发生。
更奇怪
如果在用sarama生成消息之后,我在命令行生成更多的消息,那么就会发生日志压缩
在运行terminal producer和sarama producer之后,我得到以下输出
sameSaramaKey123 : differentMessage10
sameKey123 : differentMessage9
sameKey123 : differentMessage10
在终端运行producer之后,所有消息(包括sarama以前生成的消息)都会进行日志压缩。
为什么会这样?我该怎么修?
暂无答案!
目前还没有任何答案,快来回答吧!