我和Kafka一起上课:
@Slf4j
@Component
@RequiredArgsConstructor
public class DocumentListener {
@KafkaListener(topics = "${spring.kafka.consumer.from-hotfolder-transferer.topic}")
public void listen(@Payload TaskDto taskDto, @Header(KafkaConst.HEADER_KEY) String serviceName) {
... some logic
}
字符串
当消息命中主题时,我在日志中看到:
>! 2023-11-08T15:30:05.783+03:00 INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : gpn-1: partitions assigned: [task-to-ocr-0]
>! 2023-11-08T15:39:03.175+03:00 INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=gpn-1-0, groupId=gpn-1] Node -1 disconnected.
>! 2023-11-08T15:58:27.194+03:00 INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
>! ______
>! here some values kafka admin
>! ______
>! 2023-11-08T15:58:27.219+03:00 INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.4.1
>! 2023-11-08T15:58:27.219+03:00 INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8a516edc2755df89
>! 2023-11-08T15:58:27.219+03:00 INFO [worker-ocr ] 28396 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1699448307219
>! 2023-11-08T15:58:27.235+03:00 INFO [worker-ocr ] 28396 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.admin.client for adminclient-1 unregistered
>! 2023-11-08T15:58:27.242+03:00 INFO [worker-ocr ] 28396 --- [| adminclient-1] o.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
>! 2023-11-08T15:58:27.243+03:00 INFO [worker-ocr ] 28396 --- [| adminclient-1] o.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
>! 2023-11-08T15:58:27.243+03:00 INFO [worker-ocr ] 28396 --- [| adminclient-1] o.apache.kafka.common.metrics.Metrics : Metrics reporters closed
型
我的消费者财产:
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaConfig {
private final KafkaProducerProperties kafkaProducerProperties;
@Bean
public ConsumerFactory<String, TaskDto> taskDtoConsumerFactory(KafkaProperties kafkaProperties, KafkaCustomProperties kafkaCustomProperties) {
KafkaCustomProperties.KafkaConsumerProperties consumer = kafkaCustomProperties.getConsumer().get(KafkaConst.CONSUMER_CONFIG);
Map<String, Object> prop = kafkaProperties.buildConsumerProperties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TaskDtoDeserializer.class);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.getClientId());
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, consumer.getClientId());
prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, consumer.getPollInterval());
return new DefaultKafkaConsumerFactory<>(
prop,
new StringDeserializer(),
new ErrorHandlingDeserializer<>(new TaskDtoDeserializer())
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, TaskDto> kafkaListenerContainerFactory(
ConsumerFactory<String, TaskDto> taskDtoConsumerFactory,
KafkaCustomProperties kafkaCustomProperties) {
ConcurrentKafkaListenerContainerFactory<String, TaskDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(taskDtoConsumerFactory);
factory.setCommonErrorHandler(errorHandler(kafkaCustomProperties));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.getContainerProperties().setObservationEnabled(true);
factory.afterPropertiesSet();
return factory;
}
型
使用Kafka本身的设置,一切正常,我可以使用大数据工具阅读此主题的消息。
1条答案
按热度按时间9rygscc11#
您还没有设置auto.offset.reset,所以它将默认从主题的末尾开始消费。
此外,此
from-hotfolder-transferer.topic
属性对于Sping Boot 自动配置无效。您可以将您的topic属性放在其他任何位置