如何在kafka主题中处理json?

yiytaume  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(411)

我正在构建一个java应用程序,其中有一些json对象(特别是com.satori.rtm.model.anyjson中的anyjson对象),我想在kafka主题中发送这些对象。我应该用任意字符串类型发送它们吗?我问这个是因为 KafkaProducer<K, V> 在(反)序列化时处理json值似乎有一些问题。
这是我的生产者配置

Properties props= new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerEndpoint);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
kafkaProducer= new KafkaProducer<Integer, AnyJson>(props);

其中jsonserializer.class来自org.springframework.kafka.support.serializer.jsonserializer;我看到有多个包处理json对象(如kafka.utils.json、com.google.gson.jsonobject;,等等)。然后在跑步的时候

ProducerRecord<Integer, AnyJson> record= new ProducerRecord<Integer, AnyJson>(topic, json);
kafkaProducer.send(record);

对于com.satori.rtm.connection.gsonserializer$jsonelementwrapper类,我有以下异常:找不到序列化程序
有什么帮助吗?

rdlzhqv9

rdlzhqv91#

您可能想使用 org.apache.kafka.common.serialization.ByteArraySerializer 用于键和值序列化。现在应该用byte[]配置生产者记录。
然后使用 ObjectMapper 来自Jackson(http://www.baeldung.com/jackson-object-mapper-tutorial )将任何json对象转换为字节数组。

相关问题