Kafka不读来自主题的消息

k10s72fa  于 11个月前  发布在  Apache
关注(0)|答案(1)|浏览(118)

我和Kafka一起上课:

@Slf4j
@Component
@RequiredArgsConstructor
public class DocumentListener {

    @KafkaListener(topics = "${spring.kafka.consumer.from-hotfolder-transferer.topic}")
    public void listen(@Payload TaskDto taskDto, @Header(KafkaConst.HEADER_KEY) String serviceName) {
       ... some logic
    }

字符串
当消息命中主题时,我在日志中看到:

>! 2023-11-08T15:30:05.783+03:00  INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : gpn-1: partitions assigned: [task-to-ocr-0]
>! 2023-11-08T15:39:03.175+03:00  INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=gpn-1-0, groupId=gpn-1] Node -1 disconnected.
>! 2023-11-08T15:58:27.194+03:00  INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
>! ______
>! here some values kafka admin
>! ______
>! 2023-11-08T15:58:27.219+03:00  INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.4.1
>! 2023-11-08T15:58:27.219+03:00  INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 8a516edc2755df89
>! 2023-11-08T15:58:27.219+03:00  INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1699448307219
>! 2023-11-08T15:58:27.235+03:00  INFO [worker-ocr ] 28396 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-1 unregistered
>! 2023-11-08T15:58:27.242+03:00  INFO [worker-ocr ] 28396 --- [| adminclient-1] o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
>! 2023-11-08T15:58:27.243+03:00  INFO [worker-ocr ] 28396 --- [| adminclient-1] o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
>! 2023-11-08T15:58:27.243+03:00  INFO [worker-ocr ] 28396 --- [| adminclient-1] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed


我的消费者财产:

@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaConfig {

    private final KafkaProducerProperties kafkaProducerProperties;

    @Bean
    public ConsumerFactory<String, TaskDto> taskDtoConsumerFactory(KafkaProperties kafkaProperties, KafkaCustomProperties kafkaCustomProperties) {
        KafkaCustomProperties.KafkaConsumerProperties consumer = kafkaCustomProperties.getConsumer().get(KafkaConst.CONSUMER_CONFIG);
        Map<String, Object> prop = kafkaProperties.buildConsumerProperties();
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TaskDtoDeserializer.class);
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.getClientId());
        prop.put(ConsumerConfig.CLIENT_ID_CONFIG, consumer.getClientId());
        prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, consumer.getPollInterval());
        return new DefaultKafkaConsumerFactory<>(
                prop,
                new StringDeserializer(),
                new ErrorHandlingDeserializer<>(new TaskDtoDeserializer())
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, TaskDto> kafkaListenerContainerFactory(
            ConsumerFactory<String, TaskDto> taskDtoConsumerFactory,
            KafkaCustomProperties kafkaCustomProperties) {
        ConcurrentKafkaListenerContainerFactory<String, TaskDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(taskDtoConsumerFactory);
        factory.setCommonErrorHandler(errorHandler(kafkaCustomProperties));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        factory.getContainerProperties().setObservationEnabled(true);
        factory.afterPropertiesSet();
        return factory;
    }


使用Kafka本身的设置,一切正常,我可以使用大数据工具阅读此主题的消息。

9rygscc1

9rygscc11#

您还没有设置auto.offset.reset,所以它将默认从主题的末尾开始消费。
此外,此from-hotfolder-transferer.topic属性对于Sping Boot 自动配置无效。您可以将您的topic属性放在其他任何位置

相关问题