配置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 50*1024*1024);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
打印整个有效负载,获取它和消费者数据的任何帮助都将有助于消费者记录(topic=x,partition=2,offset=1512343,createtime=1591460009853,serialized key size=8,serialized value size=9506789。。。。。。。
日志大小太大,无法打印和分析
编辑2:可以在大日志中看到更多此异常,原因是:org.springframework.kafka.kafkaexception:排队确认时中断
1条答案
按热度按时间eulz3vhy1#
我已打开一个新功能请求。
在大日志中可以看到更多此异常,原因是:
org.springframework.kafka.KafkaException: Interrupted while queuing ack
那个错误是因为你在打电话Acknowledgment.acknowledge()
在外线上MANUAL_IMMEDIATE AckMode
; 并且该外部线程被中断,阻止了ack的排队。如果可能的话,最好在侦听器线程上调用它。