我已经启动了zookeeper和kafka服务器。我开始我的Kafka生产者发送10条消息与主题“”。然后阻止了我的Kafka制作人。现在我开始我的Kafka消费者和订阅的主题“”。我的消费者会使用我的Kafka制作人发送的10条消息,而Kafka制作人现在没有运行。我需要我的Kafka消费者应该只消费运行Kafka服务器的消息。有没有办法做到这一点?在我的消费者财产里。
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "100");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
2条答案
按热度按时间dvtswwa31#
请创建一个新主题,并将属性consumerconfig.auto\u offset\u reset\u config保留为“latest”。确保不提交偏移量。i、 我们不应该使用commitsync()
默认情况下,接收者开始使用来自每个分配分区的最后一个提交偏移量的记录。如果提交的偏移量不可用,则使用为kafkaconsumer配置的偏移量重置策略consumerconfig#auto#offset#reset#config将起始偏移量设置为分区上最早或最新的偏移量。
我认为在您的情况下,您正在提交偏移量,或者有一个committ偏移量可用于给定的主题。
2skhul332#
设置以下属性:
它告诉使用者只读取最新的消息,即在使用者启动后发布的消息。