kafka-如何跳过偏移量中的错误消息并使用其余的消息

vc6uscn9  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(390)

我将kafka与avro一起用于序列化/反序列化以处理事件。如果碰巧一个不符合avro模式的坏数据出现在主题中,

.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer app: host: dcId: envId: url: reqId: jsess: secSessId: logUser: effUser: impUser: channelName: - Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition EventProcessor-0 at offset 2845. If needed, please seek past the record to continue consumption.

而且消息以相同的偏移量不断增长。有没有可能跳过这个偏移量,继续从其他偏移量中读取,如果同样的情况再次发生,也跳过那个偏移量?
消费者代码:

@KafkaListener(topics = "EventProcessor", containerFactory = "eventProcessorListenerContainerFactory")
    public void listen(Event payLoad) {

        System.out.println("REceived  message ===> " + payLoad);

    }

工厂:

@Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> eventProcessorListenerContainerFactory() {

        Map<String, Object> propMap = new HashMap<String, Object>();
        propMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        propMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propMap.put(ConsumerConfig.GROUP_ID_CONFIG, "EventProcessorConsumer");

        DefaultKafkaConsumerFactory<String, Event> consuemrFactory = new DefaultKafkaConsumerFactory<String, Event>(
                propMap);

        consuemrFactory.setValueDeserializer(new AvroDeSerializer<Event>(
                Event.class));
        ConcurrentKafkaListenerContainerFactory<String, Event> listenerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        listenerFactory.setConsumerFactory(consuemrFactory);
        listenerFactory.setConcurrency(3);
        listenerFactory.setRetryTemplate(retryTemplate());
        listenerFactory.getContainerProperties().setPollTimeout(3000);
        return listenerFactory;
    }
imzjd6km

imzjd6km1#

试着按照@poppy的建议调整你的政策

SimpleRetryPolicy policy = new SimpleRetryPolicy();
// Set the max retry attempts
policy.setMaxAttempts(5);
// Retry on all exceptions (this is the default)
policy.setRetryableExceptions(new Class[] {Exception.class});
// ... but never retry SerializationException
policy.setFatalExceptions(new Class[] {SerializationException.class}); //<-- here

// Use the policy...
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(policy);
template.execute(new RetryCallback<Foo>() {
    public Foo doWithRetry(RetryContext context) {
        // business logic here
    }
});

从这里开始:https://docs.spring.io/spring-batch/3.0.x/reference/html/retry.html

相关问题