无法使用kafka jdbc接收器连接器将数据加载到postgres

vlf7wbxs  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(264)

我使用debezium源连接器将rds postgres中的数据带到kafka主题中。主题中的数据如下所示:

{"domain":"domain-new-34B","profile":"2423947a-asf23424","account":"aasdfadf","customer":"gaf23sdf","profileno":"324","user":"234234","updatedat":233463463456,"__deleted":"false"}

我正在使用kafka jdbc接收器连接器将数据发送到cloudsqlpostgres。我的sink connector.json文件如下所示:

{
"name":"postgres-sink-connector",
"config":{
    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max":1,
    "auto.create":true,
    "connection.url":"jdbc:postgresql://host:5432/testdb",
    "connection.user":"user1",
    "connection.password":"user123",
    "topics":"server1.public.topic1",
    "auto.create":"true",
    "auto.evolve":"true",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"false",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "transforms": "flatten",
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
    "transforms.flatten.delimiter": ".",
    "table.name.format": "${topic}",
    "transforms": "route",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$2_$3",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms": "unwrap",
    "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState"
}

}
发布连接器时出现以下错误:

java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'postgres-sink-connector' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='server1.public.topic1',partition=0,offset=0,timestamp=1212312441756) with a HashMap value and null value schema.

我还没有创建目标表,我想自动创建它。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题