尝试使用 kafka-connect-jdbc
,但数据库中的某些值与kafka消息值不同。
接收器配置:
{
"name": "test-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/test_db?user=test&password=test",
"dialect.name": "PostgreSqlDatabaseDialect",
"topics.regex": "test.public.(.*)",
"transforms": "dropPrefix, unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex": "test.public.(.*)",
"transforms.dropPrefix.replacement": "public.test_$1",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_key",
"delete.enabled": "true",
"batch.size": "1"
}
}
Kafka内部消息主题:
{“schema”:{“type”:“struct”,“fields”:[{“type”:“struct”,“fields”:[{“type”:“int64”,“optional”:false,“field”:“id”},{“type”:“int64”,“optional”:true,“field”:“test\u id”},{“type”:“string”,“optional”:true,“field”:“test\u b\u c”},{“type”:“string”,“optional”:true,“field”:“test\u a\u t”},{“type”:“array”,“items”:{“type”:“string”,“optional”:true},“optional”:true,“field”:“test\u p”},{“type”:“int64”,“optional”:true,“field”:“test\u d\u id”}],“optional”:true,“name”:“test.public.test.value”,“field”:“before”},{“type”:“struct”,“fields”:[{“type”:“int64”,“optional”:false,“field”:“id”},{“type”:“int64”,“optional”:true,“field”:“test\u c\u id”},{“type”:“string”,“optional”:true,“field”:“test\u b\u c”},{“type”:“string”,“optional”:true,“field”:“test\u a\u t”},{“type”:“array”,“items”:{“type”:“string”,“optional”:true},“optional”:true,“field”:“test\u p”},{“type”:“int64”,“optional”:true,“field”:“test\u id”},“optional”:true,“name”:“test.public.test.value”,“field”:“after”},{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“version”},{“type”:“string”,“optional”:false,“field”:“connector”},{“type”:“string”,“optional”:false,“field”:“name”},{“type”:“int64”,“optional”:false,“field”:“ts\u ms”},{“type”:“string”,“可选”:true,“name”:“io.debezium.data.enum”,“version”:1,“parameters”:{“allowed”:“true,last,false”},“default”:“false”,“field”:“snapshot”},{“type”:“string”,“optional”:false,“field”:“db”},{“type”:“string”,“optional”:false,“field”:“schema”},{“type”:“string”,“optional”:false,“field”:“table”},{“type”:“int64”,“optional”:true,“field”:“txid”},{“type”:“int64”,“optional”:true,“field”:“lsn”},{“type”:“int64”,“optional”:true,“field”:“xmin”}],“optional”:false,“name”:“io.debezium.connector.postgresql.source”,“field”:“source”},{“type”:“string”,“optional”:false,“field”:“op”},{“type”:“int64”,“optional”:true,“field”:“ts\u ms”},{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“id”},{“type”:“int64”,“optional”:false,“field”:“total\u order”},{“type”:“int64”,“optional”:false,“field”:“data\u collection\u order”}],“optional”:true,“field”:“transaction”}],“optional”:false,“name”:“test.public.test.envelope”},“payload”:{“before”:null,“after”:{“id”:4441,“test \u c \u id”:3606,“test \u c”:“qwerty”,“test \u a \u t”:null,“test \u p”:[“qwe”,“asd”,“zxc”],“test \u id”:22827},“source”:{“version”:“1.2.2.final”,“connector”:“postgresql”,“name”:“test”,“ts \u ms”:1599543319277,“snapshot”:“false”,“db”:“test”,“schema”:“public”,“table”:“test”,“txid”:3914206,“lsn”:108940649328,“xmin”:null},“op”:“u”,“ts\u ms”:1599543319509,“transaction”:null}}
前任:
Kafka Sink DB
---------------------------------------
test_c_id: 3606 test_c_id: 22632
test_b_c: QWERTY test_b_c: null
大约有5-10%的数据不匹配,而且所有的数据都只出现在20列中的2列中。
有人知道是什么导致这样的问题吗?
暂无答案!
目前还没有任何答案,快来回答吧!