我将kafka与avro一起用于序列化/反序列化以处理事件。如果碰巧一个不符合avro模式的坏数据出现在主题中,
.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer app: host: dcId: envId: url: reqId: jsess: secSessId: logUser: effUser: impUser: channelName: - Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition EventProcessor-0 at offset 2845. If needed, please seek past the record to continue consumption.
而且消息以相同的偏移量不断增长。有没有可能跳过这个偏移量,继续从其他偏移量中读取,如果同样的情况再次发生,也跳过那个偏移量?
消费者代码:
@KafkaListener(topics = "EventProcessor", containerFactory = "eventProcessorListenerContainerFactory")
public void listen(Event payLoad) {
System.out.println("REceived message ===> " + payLoad);
}
工厂:
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> eventProcessorListenerContainerFactory() {
Map<String, Object> propMap = new HashMap<String, Object>();
propMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
propMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propMap.put(ConsumerConfig.GROUP_ID_CONFIG, "EventProcessorConsumer");
DefaultKafkaConsumerFactory<String, Event> consuemrFactory = new DefaultKafkaConsumerFactory<String, Event>(
propMap);
consuemrFactory.setValueDeserializer(new AvroDeSerializer<Event>(
Event.class));
ConcurrentKafkaListenerContainerFactory<String, Event> listenerFactory = new ConcurrentKafkaListenerContainerFactory<>();
listenerFactory.setConsumerFactory(consuemrFactory);
listenerFactory.setConcurrency(3);
listenerFactory.setRetryTemplate(retryTemplate());
listenerFactory.getContainerProperties().setPollTimeout(3000);
return listenerFactory;
}
1条答案
按热度按时间imzjd6km1#
试着按照@poppy的建议调整你的政策
从这里开始:https://docs.spring.io/spring-batch/3.0.x/reference/html/retry.html