kafka0.10.2消费者获得大量的副本

643ylb08  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(335)

我有一个相当简单的kafka设置-1个producer,1个topic,10个分区,10个kafkaconsumer都有相同的组id,都在一台机器上运行。当我处理一个文件时,生产者会快速创建3269条消息,消费者会很高兴地开始使用这些消息。在一段时间内一切正常,但到了某个时候,消费者开始消费重复品——大量重复品。事实上,看起来他们只是重新开始使用消息队列。如果让它运行很长时间,数据库将开始接收相同的数据条目6次或更多次。在对日志进行了一些测试之后,使用者似乎正在重新使用具有相同唯一消息名称的相同消息。
据我所知,没有发生重新平衡的情况。消费者没有死亡或被添加。这是同样的10个消费者,一次又一次地消费同样的3269条消息,直到我终止这个过程。如果我放任它过去,消费者将写下成千上万条记录,大量增加真正应该进入数据库的数据量。
我对Kafka还很陌生,但我有点不明白为什么会这样。我知道Kafka不能保证一次处理,我可以在这里和那里的两个副本。我有代码来防止再次保存相同的记录。但是,我不知道为什么消费者会一次又一次地重复使用队列。我知道kafka消息在被消费后不会被删除,但是如果所有的消费者都在同一个组中,偏移量应该可以防止这种情况,对吗?我了解一点补偿是如何工作的,但据我所知,如果没有重新平衡,它们就不应该被重置,对吗?据我所知,这些信息并没有超时。有没有一种方法可以让我的消费者消费一次队列中的所有信息,然后等待更多的消息,而不会永远重复消费相同的信息?
以下是我传递给生产者和消费者的属性:

Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("group.id", "MyGroup");
        props.put("num.partitions", 10);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        MyIngester ingester = new MyIngester(args[0], props);
pgvzfuti

pgvzfuti1#

对我来说,这似乎是一个问题,承认收据。尝试以下属性

props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "100");

相关问题