kafka connect jdbc sink连接器和avro在第二次插入时失败

yacmzcpb  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(379)

我试图遵循一个基于confluent教程的基本jdbcsinkconnector示例。该示例使用带有avro主题和模式注册表的mysql数据库。
该示例适用于到达主题并将数据正确插入mysql表的第一条消息,但是对于第二条消息,我得到错误“找不到id为26的schema”
从创建接收器作业开始:

{
  "name" :"mySqlTest5",
  "config": {
   "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.user":"root",
    "connection.password":"some-password",
    "topics":"orders",
    "tasks.max":1,
       "auto.create":true,
    "connection.url":"jdbc:mysql://some-url",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schema.registry.url": "http://dp-cp-schema-registry:8081",
    "insert.mode" :"upsert",
    "pk.mode" : "record_value",
    "pk.fields": "id"
    }
}

然后将第一条消息发送到主题:

kafka-avro-console-producer --broker-list kafka:9092 \
--topic orders3 \
--property schema.registry.url=http://schema-registry:8081 \                                       
--property value.schema='{"type":"record","name":"orders","fields": \
[{"name":"id","type":"int"},{"name":"product", "type": "string"},
{"name":"quantity", "type": "int"}, {"name":"price",
"type": "float"}]}'

生产者准备好用户输入后,我输入:

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

这将成功地将项目插入数据库!
当控制台生产者仍在等待用户输入时,我输入第二条消息:

{"id": 101, "product": "bar", "quantity": 200, "price": 70}

导致以下异常:

检索id 26连接服务器的avro架构时出错,原因是:io.confluent.kafka.schemaregistry.client.rest.exceptions.restclientexception:找不到架构;错误代码:40403,位于io.confluent.kafka.schemaregistry.client.rest.restservice.sendhttprequest(restservice)。java:230)at io.confluent.kafka.schemaregistry.client.rest.restservice.httprequest(restservice。java:256)在io.confluent.kafka.schemaregistry.client.rest.restservice.lookupsubjectversion(restservice。java:323)在io.confluent.kafka.schemaregistry.client.rest.restservice.lookupsubjectversion(restservice。java:311)位于io.confluent.kafka.schemaregistry.client.cachedschemaregistryclient.getversionfromregistry(cachedschemaregistryclient)。java:184)在io.confluent.kafka.schemaregistry.client.cachedschemaregistryclient.getversion(cachedschemaregistryclient)。java:297)at io.confluent.kafka.serializers.abstractkafkaavrodeserializer.schemaversion(抽象kafkaavrodeserializer。java:202)在io.confluent.kafka.serializers.abstractkafkaavrodeserializer.deserialize(abstractkafkaavrodeserializer。java:158) 在io.confluent.kafka.serializers.abstractkafkaavrodeserializer.deserializewithschemaandversion(abstractkafkaavrodeserializer。java:243)at io.confluent.connect.avro.avroconverter$反序列化器.反序列化(avroconverter。java:134)
我不明白为什么第一条消息被正确插入,并且在schema注册表中生成了一个具有正确id的主题(/subjects/orders value),但是第二条插入失败了。
更新:由于某些原因,avro消息容器中的架构数据有一个额外的字段(schema.registrey.schema.version=1)原始架构不包含此字段,但它出现在日志中:使用输入{“schema”发送post“{”type\“:”record\“,\”name\“:”statuses\“,\”namespace\“:\”my.namespace\“,\“字段\”[…]\“schema.registry.schema.version\”:1}“}到http://dp-cp-schema-registry:8081/subjects/mysubject?deleted=true(io.confluent.kafka.schemaregistry.client.rest.restservice)管道中的哪个进程正在添加此“schema.registry.schema.version”字段?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题