一个sink配置中每个主题(表)的pk.fields

nhhxz33t  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(696)

关于这个例子,debezium例子
我有多个主键不同的主题

item (pk : id)
itemDetail (pk :id, itemId)
itemLocation (pk :id, itemId)

jdbc-sink.source源

{
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "item,itemDetail,itemLocation",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value"
}
}

如何为每个主题(表)指定“pk.fields”?

ttcibm8c

ttcibm8c1#

我不认为每个主题都有这样的pkMap配置。
您需要为每个主题进行多个配置

{
"name": "jdbc-sink-item",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "item",
    "pk.fields": "id",

{
"name": "jdbc-sink-itemDetail",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "itemDetail",
    "pk.fields": "id,itemId",

等等

相关问题