无待定回复:consumerrecord

fjaof16o  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(364)

我正在尝试使用replyingkafkatemplate,并且断断续续地看到下面的消息。
无挂起回复:consumerrecord(主题=请求-回复主题,分区=8,偏移量=1,createtime=1544653843269,序列化键大小=-1,序列化值大小=1609,标头=记录标头(标头=[记录标头(键=kafka\u correlationid,值=[-14,65,21,-118,70,-94,72,87,-113,-91,92,72,-124,-110,-64,-94]),isreadonly=false),key=null,correlationid:[-18271255759235816475365319231847350110],可能超时,或者使用共享回复主题
它将源于下面的代码

RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
  if (this.sharedReplyTopic) {
    if (this.logger.isDebugEnabled()) {
      this.logger.debug(missingCorrelationLogMessage(record, correlationId));
    }
  }
  else if (this.logger.isErrorEnabled()) {
    this.logger.error(missingCorrelationLogMessage(record, correlationId));
  }
}

但只会在无意中发生
我还将sharedreplytopic设置为false,如下所示,并尝试强制更长的超时时间

ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
        replyKafkaTemplate.setSharedReplyTopic(false);
        replyKafkaTemplate.setReplyTimeout(10000);
        return replyKafkaTemplate;

我的容器如下

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setBatchListener(false);
    factory.getContainerProperties().setPollTimeout(1000);
    factory.getContainerProperties().setIdleEventInterval(10000L);
    factory.setConcurrency(3);
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}
gmxoilav

gmxoilav1#

如果是断断续续的,很可能回复时间太长了。这个信息似乎很清楚
可能超时,或使用共享回复主题
每个客户端示例必须使用自己的回复主题或专用分区。
编辑
如果收到的邮件的相关id与this.futures(待定回复)中当前的条目不匹配,则会获取日志。这只能在以下情况下发生:
请求超时(在这种情况下会有相应的警告日志)。
模板是stop()ped(在这种情况下,this.futures被清除)。
由于某种原因(不应该发生),已处理的回复将被重新传递。
在将密钥添加到this.futures之前会收到回复(因为它是在发送()或删除记录之前插入的,所以无法执行)。
服务器端为同一请求发送两个或多个回复。
其他应用程序正在向同一回复主题发送数据。如果您可以用调试日志记录来重现它,这将很有帮助,因为这样我们就可以在send上记录相关键。

cu6pst1q

cu6pst1q2#

在使用者上使用属性@header(kafkaheaders.correlation\u id)修复的问题

@KafkaListener(topics = "${kafka.topic.model}")
@SendTo("replymodeltopic")
@Override
public Model receive(ConsumerRecord<String, model> record, @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) { 
    record.headers().add(KafkaHeaders.CORRELATION_ID, correlation);
    return record.value();
}

在我的配置上我有

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.tunnel.group}")
    private String tunnelGroup;

    @Value("${kafka.topic.json.reply}")
    private String jsonTopicReply;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, tunnelGroup);

        return props;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, Model> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Model.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Model> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(replyTemplate());
        return factory;
    }

    @Bean
    public ProducerFactory<String, Model> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Model> replyTemplate() {
        KafkaTemplate<String, Model> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        kafkaTemplate.setDefaultTopic(jsonTopicReply);
        return kafkaTemplate;
    }

}

相关问题