kafka connect jdbc在jsonconverter上失败

0dxa2lsx  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(555)

我正在设计一个mysql->debezium->kafka->flink->kafka->kafka connect jdbc->mysql。下面是我从flink写的示例消息(我也尝试过使用kafka控制台生成器)

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      }
    ],
    "optional": true,
    "name": "user"
  },
  "payload": {
    "id": 1,
    "name": "Smith"
  }
}

但是在jsonconverter上连接失败

DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)

我已经调试过了,并且在方法中 public SchemaAndValue toConnectData(String topic, byte[] value) 值为空。我的接收器配置为:

{
    "name": "user-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "user",
        "connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
        "connection.user": "root",
        "connection.password": "root",        
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}

有人能帮我解决这个问题吗?

nwlls2ji

nwlls2ji1#

我认为问题与(kafka消息的)值序列化无关。这封信的钥匙有点问题。
你的工作是什么 key.converter ? 我想是一样的 value.converter ( org.apache.kafka.connect.json.JsonConverter ). 你的钥匙可能很简单 String ,不包含 schema , payload 试着改变 key.converterorg.apache.kafka.connect.storage.StringConverter 对于Kafka连接,您可以设置默认值 Converters ,但也可以为特定的连接器配置设置特定的连接器配置(这将覆盖默认的连接器配置)。为此,您必须修改配置请求:

{
    "name": "user-sink",
    "config": {
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "user",
        "connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
        "connection.user": "root",
        "connection.password": "root",        
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}

相关问题