Kafka主题压缩

omqzjyyz  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(285)

我用过 kafka-topics.bat --zookeeper localhost:2181 --alter --topic test --config cleanup.policy=compact delete config min.cleanable.dirty.ratio=0.01 --config segment.ms=100 --config delete.retention.ms=100 来压缩我的主题。我已经用相同的密钥发送了2000条消息。当我使用这些消息时,我会分别收到每条消息,而不是一条压缩的消息。

5cnsuln7

5cnsuln71#

您所指的压缩设置与您使用kafka客户端的消息方式无关。请查看此处的官方文档以了解更多详细信息。
如果要控制客户端如何使用消息,则必须使用客户端配置属性配置客户端。
考虑这样一种情况:您将主题合并300毫秒,并接收一组消息(consumerrecords),然后可以对这些消息进行迭代以独立地处理每条消息。

while(true) {
   ConsumerRecords<String, JsonNode> records = kafkaConsumer.poll(300);
       if(records.count() > 0) {
          for(ConsumerRecord<String, JsonNode> record: records) {
             if(counter % 500 == 0) {
                 log.info("Record recovered, groupId: {}, topicName: {}, key: {}, value: {} , offset: {}",
                 this.groupId, this.topicNames, record.key(), record.value(), record.offset());
                    }
                }
            }
        }

相关问题