我是Kafka的新手。我正在做一个个人项目,我想写两个不同的Avro主题,并使用leftJoin合并它们。一旦我合并了它们,我想产生相同的消息到KSQL数据库。(我还没有实现这部分)。
我正在使用Kafka模板生成2个Avro主题,并将它们转换为kstream以合并它们。我还在使用KafkaListener打印其中的任何消息,该工作正在进行中。以下是我遇到问题的地方:实际上是两个,在这两种情况下,它都不会在合并的主题中产生任何消息。
1.如果我从kstream中删除了consumed.with()
,那么它会抛出一个默认键Serde错误。
1.但是如果我保留它,那么它会抛出一个反序列化错误。
我甚至在我的application.properties
和main()
内部的streamConfig
中提供了默认的序列化和反序列化,但它仍然不起作用。
有人能帮我合并两个Avro主题吗?是因为我使用Avro模式而出错吗?我应该使用JSON吗?我想使用一个模式,因为我的消息的值部分将有多个值。
例如:{Key : Value}
= {company : {inventory_id, company, color, inventory}}
= {Toyota : {0, RAV4, 50,000}}
下面是所有文件的link:application.properties
、DefaultKeySerdeError.txt
、DeserializationError.txt
、FilterStreams.java
、Inventory.avsc
、Pricing.avsc
和MergedAvro.avsc
。如果您希望我将它们放在下面,请告诉我。非常感谢您的帮助!
https://gist.github.com/Arjun13/b76f53c9c2b4e88225ef71a18eb08e2f
1条答案
按热度按时间vs91vp4v1#
查看DeserializationError.txt文件,问题似乎是您没有提供schema注册表的凭据。即使您在www.example.com文件中提供了凭据application.properties,它们也不会进入serdes配置,因此如果您将
basic.auth.user.info
配置添加到serdeConfig
Map中,您应该已经设置好了。