我生成Kafka(confluentinc/cp-server:7.0.1)主题my-topic
的事件,我使用schema-registry(confluentinc/cp-schema-registry:7.1.0)来存储我的schemas(protobuf)。
我使用kafka-connect(confluentinc/cp-kafka-connect:7.0.1-1-ubi 8)连接到mongoDB
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "my-topic",
"key.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"connection.uri": "mongodb://mongo:27017",
"database": "my-database",
"collection": "my-collection",
"tasksMax": "1",
字符串
问题是,连接器在连接schema-registry时正在查找模式名my-topic-value
,当然它找不到,因此二进制文件不能被反序列化。连接器返回错误SerializationException error code 40401
并报告找不到my-topic-value
。
所以我发布了主题为my-topic-value
的事件(不改变仍然有"topics": "my-topic"
的连接器),然后它工作了,但这样做是不可行的。有什么解决办法吗?
- 谢谢-谢谢
1条答案
按热度按时间bihw5rsg1#
主题有键和值。两者都可以有不同的模式,因此它们分别Map到注册表中的主题,而不是主题本身。
它可以更改,但不能删除后缀。https://developer.confluent.io/courses/schema-registry/schema-subjects/
更改主题名称不是正确的解决方案。相反,将模式注册到正确的主题(my-topic),这是任何使用Protobuf序列化器的生产者都应该自动完成的(除非它们也覆盖了主题策略)