java 使用Sping Boot Kafka模板接收消息

dgsult0t  于 2023-05-05  发布在  Java
关注(0)|答案(1)|浏览(184)

在Using Kafka with Sping Boot 的上下文中,我发现了很多生成消息和接收消息的示例,但我正在构建一个测试框架以在代码中使用,我想从文档中探索更多这一点:
https://docs.spring.io/spring-kafka/reference/html/#kafka-template-receive
我基本上想要一个简单的方法来获取数据从一个主题在一个同步的方式,但每次我试图使用我的KafkaTemplate接收消息

kafkaTemplate.receive(config.getTopic(), 0, 0)

它给我这个错误:

Method threw 'java.lang.IllegalArgumentException' exception.

A consumerFactory is required

我真的只是有兴趣了解是否有任何好的例子,如何使用这个。我所做的事情可能没有太大的价值,但我真的只是想在我的应用程序中进行一些测试,以确保我在正确的主题中向Kafka生成消息,等等。我希望有一个简单的测试方法,而不需要构建一个批量消费者只是为了测试。如果KafkaTemplate中存在这种“receive”方法,我应该能够使用它们,但不知何故,我还没有找到一个很好的例子来说明如何做到这一点。
更新:我已经看到我可能需要定义一个消费者工厂,只是猜测是否有一种简单的方法从生产者配置复制配置

ProducerFactory<String, byte[]> producerFactory = kafkaTemplate.getProducerFactory();
    Map<String, Object> props = producerFactory.getConfigurationProperties();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-receive");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    kafkaTemplate.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));

我认为我走在正确的道路上,但我仍然不能真正从模板构建消费者:

Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at app//org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
    at app//org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
    at app//org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483)
    at app//org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451)
    at app//org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427)
    at app//org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394)
    at app//org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371)
    at app//org.springframework.kafka.core.KafkaTemplate.receive(KafkaTemplate.java:574)
    at app//org.springframework.kafka.core.KafkaOperations.receive(KafkaOperations.java:295)

更新2:我的问题肯定是缺少了kafkaTemplate.setConsumerFactory也像在评论中提到的,我得到的第二个错误是从复制producerConfigs到consumerConfigs(我在生产者配置中有一个拦截器)

wvt8vs2t

wvt8vs2t1#

我认为经过一些调查,也在评论kafkaTemplate.receive的答案是不是真的我应该使用。主要是因为它每次只消耗一条记录。

@Override
    public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) {
        Properties props = oneOnly();
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
        try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
            requested.forEach(tpo -> {
                if (tpo.getOffset() == null || tpo.getOffset() < 0) {
                    throw new KafkaException("Offset supplied in TopicPartitionOffset is invalid: " + tpo);
                }
                ConsumerRecord<K, V> one = receiveOne(tpo.getTopicPartition(), tpo.getOffset(), pollTimeout, consumer);
                List<ConsumerRecord<K, V>> consumerRecords = records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList<>());
                if (one != null) {
                    consumerRecords.add(one);
                }
            });
            return new ConsumerRecords<>(records);
        }
    }

也许这就是我要找的https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/test/utils/KafkaTestUtils.html

相关问题