schema注册表拒绝未更改的模式,因为它不兼容

qgelzfjb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(373)

我们有一个kafka集群,运行的avro模式存储在confluent的模式注册表中。在最近重新部署(其中一个)streams应用程序时,我们开始在单个主题(emailsent)上看到不兼容的模式错误。这是唯一一个失败的主题,并且每当新的emailsent事件提交到该主题时,我们都会收到错误。
引起原因:org.apache.kafka.common.errors.serializationexception:注册avro架构时出错:{“type”:“record”,“name”:“emailsent”,“namespace”:“com.company\u name.communications.schemas”,“fields”:[{“name”:“customerid”,“type”:“long”,“doc”:“customer's id in the customers service”},{“name”:“messageid”,“type”:“long”,“doc”:“发送电子邮件的消息id”},{“name”:“senttime”,“type”:{“type”:“string”,“avro.java.string”:“string”},“doc”:“格式为'yyyy-mm-dd hh:mm:ss.sss'”的活动发送时间”},{“name”:“campaignid”,“type”:“long”,“doc”:“营销套件中活动的id”},{“name”:“appid”,“type”:[“null”,“long”],“doc”:“与发送的电子邮件相关联的应用程序id,如果电子邮件与特定应用程序相关”,“default”:null}],“version”:1}原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.restclientexception:正在注册的架构与早期架构不兼容;错误代码:409;错误代码:io.confluent.kafka.schemaregistry.client.rest.restservice.sendhttprequest(restservice)处的409。java:170)at io.confluent.kafka.schemaregistry.client.rest.restservice.httprequest(restservice。java:187)在io.confluent.kafka.schemaregistry.client.rest.restservice.registerschema(restservice。java:238)在io.confluent.kafka.schemaregistry.client.rest.restservice.registerschema(restservice。java:230)在io.confluent.kafka.schemaregistry.client.rest.restservice.registerschema(restservice。java:225)在io.confluent.kafka.schemaregistry.client.cachedschemaregistryclient.registerandgetid(cachedschemaregistryclient。java:59)在io.confluent.kafka.schemaregistry.client.cachedschemaregistryclient.register(cachedschemaregistryclient)。java:91)位于io.confluent.kafka.serializers.abstractkafkaavroserializer.serializeimpl(abstractkafkaavroserializer)。java:72)在io.confluent.kafka.serializers.kafkaavroserializer.serialize(kafkaavroserializer。java:54)在org.apache.kafka.streams.processor.internals.recordcollectorimpl.send(recordcollectorimpl。java:91)在org.apache.kafka.streams.processor.internals.recordcollectorimpl.send(recordcollectorimpl。java:78)在org.apache.kafka.streams.processor.internals.sinknode.process(sinknode。java:87)在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:85)在org.apache.kafka.streams.kstream.internals.kstreamfilter$kstreamfilterprocessor.process(kstreamfilter。java:43)在org.apache.kafka.streams.processor.internals.processornode$1.run(processornode。java:46)在org.apache.kafka.streams.processor.internals.streamsmetricsimpl.measurelatencins(streamsmetricsimpl。java:211)在org.apache.kafka.streams.processor.internals.processornode.process(processornode。java:124)在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:85)在org.apache.kafka.streams.kstream.internals.kstreammap$kstreammapprocessor.process(kstreammap。java:42)在org.apache.kafka.streams.processor.internals.processornode$1.run(processornode。java:46)位于org.apache.kafka.streams.processor.internals.streamsmetricsimpl.measurelatencins(streamsmetricsimpl。java:211)在org.apache.kafka.streams.processor.internals.processornode.process(processornode。java:124)在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:85)在org.apache.kafka.streams.kstream.internals.kstreampeek$kstreampeekprocessor.process(kstreampeek。java:44)在org.apache.kafka.streams.processor.internals.processornode$1.run(processornode。java:46)位于org.apache.kafka.streams.processor.internals.streamsmetricsimpl.measurelatencins(streamsmetricsimpl。java:211)在org.apache.kafka.streams.processor.internals.processornode.process(processornode。java:124)在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:85)位于org.apache.kafka.streams.kstream.internals.kstreammapvalues$kstreammapprocessor.process(kstreammapvalues)。java:41)在org.apache.kafka.streams.processor.internals.processornode$1.run(processornode。java:46)在org.apache.kafka.streams.processor.internals.streamsmetricsimpl.measurelatencins(streamsmetricsimpl。java:211)在org.apache.kafka.streams.processor.internals.processornode.process(processornode。java:124)在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:85)在org.apache.kafka.streams.kstream.internals.forwardingcacheflushlistener.apply(forwardingcacheflushlistener)。java:42)位于org.apache.kafka.streams.state.internals.cachingkeyvaluestore.putandmaybeforward(cachingkeyvaluestore)。java:92)访问org.apache.kafka.streams.state.internals.cachingkeyvaluestore.access$000(cachingkeyvaluestore。java:35)在org.apache.kafka.streams.state.internals.cachingkeyvaluestore$1.apply(cachingkeyvaluestore。java:79)在org.apache.kafka.streams.state.internals.namedcache.flush(namedcache。java:141)位于org.apache.kafka.streams.state.internals.namedcache.evict(namedcache。java:232)在org.apache.kafka.streams.state.internals.threadcache.maybeevict(线程缓存。java:245)位于org.apache.kafka.streams.state.internals.threadcache.put(threadcache。java:153)位于org.apache.kafka.streams.state.internals.cachingkeyvaluestore.putinternal(cachingkeyvaluestore)。java:193)在org.apache.kafka.streams.state.internals.cachingkeyvaluestore.put(cachingkeyvaluestore。java:188)在org.apache.kafka.streams.state.internals.cachingkeyvaluestore.put(cachingkeyvaluestore。java:35)位于org.apache.kafka.streams.state.internals.innermeteredkeyvaluestore.put(innermeteredkeyvaluestore。java:199)在org.apache.kafka.streams.state.internals.meteredkeyvaluebytesstore.put(meteredkeyvaluebytesstore。java:121)在org.apache.kafka.streams.kstream.internals.ktablesource$ktablesourceprocessor.process(ktablesource)中。java:63)在org.apache.kafka.streams.processor.internals.processornode$1.run(processornode。java:46)在org.apache.kafka.streams.processor.internals.streamsmetricsimpl.measurelatencins(streamsmetricsimpl。java:211)在org.apache.kafka.streams.processor.internals.processornode.process(processornode。java:124)在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:85)在org.apache.kafka.streams.processor.internals.sourcenode.process(sourcenode。java:80)在org.apache.kafka.streams.processor.internals.streamtask.process(streamtask。java:222)在org.apache.kafka.streams.processor.internals.assignedtasks.process(assignedtasks。java:409)在org.apache.kafka.streams.processor.internals.taskmanager.process(任务管理器。java:308)在org.apache.kafka.streams.processor.internals.streamthread.processandmaybecommit(streamthread。java:939)在org.apache.kafka.streams.processor.internals.streamthread.runonce(streamthread。java:819)在org.apache.kafka.streams.processor.internals.streamthread.runloop(streamthread。java:771)在org.apache.kafka.streams.processor.internals.streamthread.run(streamthread。java:741)
自2018年6月以来,此架构一直保持不变,到目前为止,我们已成功处理emailsent事件。
与streams应用程序的部署相关联的pr不会更改模式、抛出错误的streams处理器,也不会更改streams应用程序的任何依赖项。我的怀疑在于schema registry,有没有人有过类似的经验,或者对导致失败的原因有什么见解?我找不到任何关于错误代码409的信息,这对任何人都有意义吗?
提前谢谢。

6ovsh4lw

6ovsh4lw1#

我不认为服务器会撒谎。您还没有向我们显示两个模式来比较它们(注册表中的模式和错误消息中的模式)。
解决这个问题的一种方法是将配置设置为无兼容性,

export KAFKA_TOPIC=logEvents
curl -X PUT http://schema-registry:8081/config/${KAFKA_TOPIC}-value -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"

(同样的方法也适用于 ${KAFKA_TOPIC}-key 如果需要)
然后上传你的新模式。
但是
完成后,将其设置回向后兼容性(或原始配置)
这可能会损坏正在读取来自旧模式和新的不兼容模式的消息的avro使用者。

相关问题