我正在尝试使用replyingkafkatemplate,并且断断续续地看到下面的消息。
无挂起回复:consumerrecord(主题=请求-回复主题,分区=8,偏移量=1,createtime=1544653843269,序列化键大小=-1,序列化值大小=1609,标头=记录标头(标头=[记录标头(键=kafka\u correlationid,值=[-14,65,21,-118,70,-94,72,87,-113,-91,92,72,-124,-110,-64,-94]),isreadonly=false),key=null,correlationid:[-18271255759235816475365319231847350110],可能超时,或者使用共享回复主题
它将源于下面的代码
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
if (this.sharedReplyTopic) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(missingCorrelationLogMessage(record, correlationId));
}
}
else if (this.logger.isErrorEnabled()) {
this.logger.error(missingCorrelationLogMessage(record, correlationId));
}
}
但只会在无意中发生
我还将sharedreplytopic设置为false,如下所示,并尝试强制更长的超时时间
ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyKafkaTemplate.setSharedReplyTopic(false);
replyKafkaTemplate.setReplyTimeout(10000);
return replyKafkaTemplate;
我的容器如下
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.getContainerProperties().setPollTimeout(1000);
factory.getContainerProperties().setIdleEventInterval(10000L);
factory.setConcurrency(3);
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
2条答案
按热度按时间gmxoilav1#
如果是断断续续的,很可能回复时间太长了。这个信息似乎很清楚
可能超时,或使用共享回复主题
每个客户端示例必须使用自己的回复主题或专用分区。
编辑
如果收到的邮件的相关id与this.futures(待定回复)中当前的条目不匹配,则会获取日志。这只能在以下情况下发生:
请求超时(在这种情况下会有相应的警告日志)。
模板是stop()ped(在这种情况下,this.futures被清除)。
由于某种原因(不应该发生),已处理的回复将被重新传递。
在将密钥添加到this.futures之前会收到回复(因为它是在发送()或删除记录之前插入的,所以无法执行)。
服务器端为同一请求发送两个或多个回复。
其他应用程序正在向同一回复主题发送数据。如果您可以用调试日志记录来重现它,这将很有帮助,因为这样我们就可以在send上记录相关键。
cu6pst1q2#
在使用者上使用属性@header(kafkaheaders.correlation\u id)修复的问题
在我的配置上我有