kafka connect,jdbcsinkconnector-getting“检索id 1的avro架构时出错,未找到主题”;错误代码:40401“

7uzetpgm  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(347)

我创建了一个nifi流,它最终将json记录发布为带有avro编码值和字符串键的记录,使用confluent registry中的模式作为值模式。以下是nifi中avrorecordsetwriter的配置。
我现在正在尝试使用Kafka连接( connect-standalone )使用jdbcsinkconnector将消息移动到postgresql数据库,但出现以下错误:检索id 1的avro架构时出错
我已经确认我的汇合注册表中有一个模式,id为1。下面是我对连接任务的配置
辅助进程配置:

bootstrap.servers=localhost:29092
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/tmp/connect.offsets
rest.host.name=localhost
rest.port=8083
plugin.path=share/java

连接器配置:

name=pg-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds
connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=xxxxxxxx
insert.mode=upsert
table.name.format=test_data
auto.create=true

我在nifi中创建了一个正确使用消息的流,并且通过指定 --property schema.registry.url=http://schema-registry:8081 . 请注意,我在docker容器中运行使用者,这就是url不是localhost的原因。
我不确定我错过了什么。我唯一的想法是我使用了错误的类作为密钥转换器,但是对于给定的错误,这是没有意义的。有人能看出我做错了什么吗?

2exbekwf

2exbekwf1#

我不太了解nifi,但是我看到模式的名称是“rds”,在错误日志中,它说它没有在模式注册表中找到主题。
Kafka的使用 KafkaAvroSerializer 序列化avro记录,同时在模式注册表中注册相关的avro模式。it使用 KafkaAvroDeserializer 反序列化avro记录并从schema注册表检索关联的schema。
schema registry将schema存储到名为“subjects”的类别中,为记录命名subject的默认行为是: topic_name-value 对于值记录和 topic_name-key 为了钥匙。
在您的例子中,您没有向kafka注册模式,而是向nifi注册模式,因此我猜测名称“rds”将出现在模式注册表中或是模式注册表上的主题名称。
您是如何验证您的架构是否正确存储的?
通常情况下,正确的主题是 rds-value 因为您只在值记录上使用模式注册表。

相关问题