如何使instaclustr kafka sink connector与postgres的avro序列化值一起工作?

xmd2e60i  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(783)

我有一个Kafka主题的avro序列化值。
我正在尝试设置一个jdbc(postgres)接收器连接器,以便将这些消息转储到postgres表中。
但是,我得到下面的错误。

"org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.connect.avro.AvroConverter for configuration value.converter: Class io.confluent.connect.avro.AvroConverter could not be found."

我的sink.json是

{"name": "postgres-sink",
  "config": {
    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max":"1",
    "topics": "<topic_name>",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "instaclustr_schema_registry_host:8085",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres?currentSchema=local",
    "connection.user": "postgres",
    "connection.password": "postgres",
    "auto.create": "true",
    "auto.evolve":"true",
    "pk.mode":"none",
    "table.name.format": "<table_name>"
  }
}

此外,我还对connect distributed.properties(引导服务器)进行了更改。
我执行的命令是-

curl -X POST -H "Content-Type: application/json" --data @postgres-sink.json https://<instaclustr_schema_registry_host>:8083/connectors
f3temu5u

f3temu5u1#

io.confluent.connect.avro.AvroConverter 不是apache kafka发行版的一部分。您可以将apachekafka作为confluent平台的一部分运行(confluent平台随附,更简单),也可以单独下载并自行安装。

相关问题