java使用来自ApacheKafka的avro格式的消息

cclgggtu  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(547)

我想使用spring-boot配置的使用者(使用spring-kafka库)使用kafka代理中的消息,源代码是一个jdbc连接器,负责从mysql数据库中提取消息,这些消息需要使用
下面我附上我的application.yml文件

server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

消费者-

@KafkaListener(topics = "Sample-Topic", groupId = "group_id")
    public void consume(String message) throws IOException {
        System.out.println(message);
    }

主题中的消息以avro格式存储,因为我正在使用jdbc连接器。
我得到的不是json,而是加密输出,有些输出消息只是普通字符串。

kxe2p93d

kxe2p93d1#

您需要使用avro反序列化程序而不是 StringDeserializer .

相关问题