kafka消费者民意调查仅第二次获得消息

gijlo24d  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(314)

我有消费者配置:

public static Consumer<String, TransactionDataAvro> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new KafkaConsumer<>(props);
    }

他就这个题目发表了意见

this.consumer.subscribe(Collections.singletonList("fss-fsstransdata"));

每次我试图得到信息( this.consumer.poll(Duration.ofMillis(500)) ),它稳定地只返回第二个方法调用
代码

log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
  log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
  log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
  log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
  log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());

日志:

| 17:47:40.688 | main                | INFO  | IaStepsDefinitions               | COUNT:0
| 17:47:40.689 | main                | INFO  | IaStepsDefinitions               | COUNT:1
| 17:47:41.690 | main                | INFO  | IaStepsDefinitions               | COUNT:0
| 17:47:42.691 | main                | INFO  | IaStepsDefinitions               | COUNT:0
| 17:47:43.692 | main                | INFO  | IaStepsDefinitions               | COUNT:0

请向我解释为什么会这样

wko9yo5t

wko9yo5t1#

Consumer.poll() 有很多事情在幕后进行,除了实际调查数据。
试图找到小组协调人
连接到此组协调器节点
开始心跳线
加入组并由分区分配
如果找不到,则将偏移量重置为Committed或earliest/latest
去拿唱片
所有这些步骤都受传递给poll()方法的duration对象的约束,如果duration=1ms,情况会更糟。
在我看来,将此逻辑放在poll()方法中,让poll执行polling,在后台线程和/或subscribe方法中执行其余操作是误导和不正确的。
进行投票时,您不希望系统执行以下操作:

if (!updateAssignmentMetadataIfNeeded(timer)) {
                return ConsumerRecords.empty();
            }

轮询是面向客户机的逻辑,如果它得到0条记录,则表示代理为空。
如果您调用rest服务并得到空响应,您就知道服务器是空的。如果调用preparedstatement.execute(),则会得到正确的结果或异常。如果调用rabbitmq.basicget(),则得到一个空响应,这意味着队列为空。
长话短说,在你的情况下,只要增加第一次投票的超时时间,你应该马上得到更新。

相关问题