有一个nodejs kafka producer将文件内容发送给kafka。当Kafka消费者消费了来自Kafka的消息时-
{"type":"Buffer","data":[91,13 .....]
当我使用 m.message.value.toString('utf8')
在nodejs kafka consumer中,它打印实际的消息。但我需要在java中消费kafka消费者。我试过了 property.put("value.serializer.encoding", "utf8")
以及 new String(consumerRecord.value())
但仍然打印{ "type":"Buffer","data":[91,13 .....]
. 我的问题是如何使用nodejs kafka producer生成的java字符串格式的消息。
1条答案
按热度按时间30byixjq1#
您应该在生产者和消费者api调用中使用适当的键和值序列化程序。
kafka提供了一些默认的键和值序列化程序,如stringserializer等。
用于生成和使用字符串消息的字符串序列化程序。
props.put(“key.serializer”,“org.apache.kafka.common.serialization.stringserializer”);props.put(“value.serializer”,“org.apache.kafka.common.serialization.stringserializer”);
例如,您可以引用生产者和消费者文档。
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/kafkaproducer.htmlhttps用法:/kafka.apache.org/10/javadoc/?org/apache/kafka/clients/consumer/kafkaconsumer.html
schema registry当您从nodejs发送消息并使用kafka consumer使用该消息时,您需要在javakafka consumer中提供相同的消息反序列化器键和值。
有一种方法可以解决这个不兼容的问题,那就是使用schema registry来使用kafka avro存储消息的模式。然后我们可以在nodejs kafka producer和javakafka consumer中使用该模式来使用消息。
以avro为例的nodejs-kafka生产者
下面是一个nodejs kafka avro的例子
javakafka使用者使用模式注册表示例