我想使用avro和合流模式注册表将json格式的数据写入kafka集群。我已经在confluent schema registry中创建了一个模式,如下所示:
json如下所示:
在nifi中,我目前使用的是publishkafkarecord\u 2\u 6处理器,其配置如下:
为了处理json,我使用了如下配置的JSonthereReader:
要写入kafka,我们使用的是avrorecordsetwriter,其配置如下:
当我看一看Kafka上写着什么时,我得到了这样一个神秘的东西:
有人能指出我的错误吗?
提前谢谢。
1条答案
按热度按时间oiopk7p51#
avro是一种二进制格式,恰好有一个json表示。这个
AvroRecordSetWriter
仅使用二进制格式。要写出json,需要使用json记录集编写器。所以,据我所知,你看到的只是Kafka队列中消息的二进制表示。这将是在处理器中使用avro编写器的预期行为。
fwiw,这正是你应该用Kafka在大多数成本模式。二进制格式更小、更简洁,因此Kafka将更快地在两端处理它。您还可以将其设置为不将模式写入二进制blob,这样它将非常简洁,然后让下游系统使用基于主题名或其他内容的约定来选择模式。