使用json和schema注册表的kafka连接

x759pob2  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(225)

我正在尝试使用kafka connect的jdbcsinkconnector将kafka主题中的json消息存储到postgresql表中。5.5版的confluent平台应支持此功能
在生产者方面,我使用以下序列化程序:

value.serial'sizer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer

架构注册表配置:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "$id": "http://example.com/myURI.schema.json",
  "title": "value_greeting",
  "description": "Sample schema to help you get started.",
  "type": "object",
  "properties": {

    "msg": {
      "type": "string",
      "description": "The string type is used for strings of text."
    },
    "name": {
      "type": "string",
      "description": "The string type is used for strings of text."
    }  
  }
}

连接器配置:

"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "topics": "greeting",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres",
    "connection.user": "postgres",
    "connection.password": "postgres",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

很遗憾,连接器出现故障,错误如下:

ERROR WorkerSinkTask{id=GREE-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:492)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
        at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:111)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:492)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我做错什么了?
谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题