为什么我的Kafka消费者读不到我的Kafka制作者的信息?

n1bvdmb6  于 2023-01-13  发布在  Apache
关注(0)|答案(1)|浏览(215)

我是Kafka的新手,我只是尝试在我的node.js项目中创建Kafka Producer-使用kafkajs包-它可以向我的Sping Boot 应用程序发送字符串。
这是我的制作人的样子:

router.put('/kafka/test', async (req, res) => {
    try {

        await producer.connect();
        console.log('kafka connected');
        await producer.send({
            topic: 'kafkaStringTest',
            messages: [
                { value: "{ \"test\": \"This is my test string\" }" }
            ]
        });

        ...

    } catch (err) {
    res.status(500).json({ message: err.message });
}

这是我的消费者的样子:

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "kafkaStringTest")
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
    }
}

当我请求/kafka/test端点时,使用者抛出以下错误:

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.kafka.common.serialization.StringDeserializer] to [java.lang.String] for GenericMessage [payload=org.apache.kafka.common.serialization.StringDeserializer@44e58368, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@42a9ac04, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=kafkaStringTest, kafka_receivedTimestamp=1671019397060, kafka_groupId=***}]

如何修复此错误?

xfb7svmp

xfb7svmp1#

如文档中所述:
此机制需要在@Configuration类之一上使用@EnableKafka注解,并需要侦听器容器工厂,该工厂用于配置基础ConcurrentMessageListenerContainer。默认情况下,需要使用名为kafkaListenerContainerFactory的Bean。
检查文档
在这个工厂中,您需要将StringDeserializer设置为消费者属性。(我假设您在实现Kafka生产者时使用了StringSerializer)

相关问题