我们已经在kafkastreams中使用confluent schema registry一年多了,一切都运行得很好;直到昨天。
在uat环境中,我们似乎删除了一个模式主题,并且我们的一个应用程序开始使用消息进行故障转移
[错误]logandfailexceptionhandler-反序列化过程中捕获异常,taskid:0\u 13,主题:主题名称,分区:13,偏移量:0 org.apache.kafka.common.errors.serializationexception:检索id 1531的avro架构时出错
我检查了schema注册表,发现主题丢失,并用curl查询错误中列出的id 1531,例如:
curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531
然后回来了:
{"error_code":40403,"message":"Schema not found"}
我天真地只是尝试再次注册模式而没有考虑它,它工作了,但该模式注册的id与以前的1531 id不同。
我需要注册到ID1531的模式,因为主题中现有的消息已经在魔法字节中包含了ID1531。
我查了api文档https://docs.confluent.io/current/schema-registry/docs/develop/api.html 但没有看到为模式设置给定id的任何内容。
是否有任何方法可以使用schema registry将schema强制为特定的id?
我知道一些备份解决方案,但我现在正在寻找一个修复,希望能防止数据丢失或非常措施,以修复主题数据。
1条答案
按热度按时间3qpi33ja1#
是否有任何方法可以使用schema registry将schema强制为特定的id?
没有。
1531的id实际上并没有“消失”,顺便说一下,它只是在注册表中被标记为已删除(使用
_schemas
主题)。据我所知,当你使用Kafka夫罗德塞利泽时,真的没有办法避免这个错误。您必须使用bytearraydeserializer,然后使用schema registry客户端“修复”或“查找”正确的id,然后反序列化消息的其余部分。
另一个选项是重置用户组,以便完全跳过这些消息,或者设置异常处理。使用kafka的streams api处理错误消息