kafka confluent:如何在jdbc接收器连接器中使用pk.mode=record\键来执行upsert和delete模式?

wecizke3  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(444)

在kafka confluent中,如何在使用 pk.mode=record_key 对于mysql表中的复合键?upsert模式在使用 pk.mode=record_values . 是否需要进行其他配置?
如果我试着 pk.mode=record_key . 错误-由以下原因引起: org.apache.kafka.connect.errors.ConnectException :只需要定义一个pk列,因为记录的键架构是基元类型。下面是我的jdbc接收器连接器配置:

{
    "name": "<name>",
    "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "<topic name>",
    "connection.url": "<url>",
    "connection.user": "<user name>",
    "connection.password": "*******",
    "insert.mode": "upsert",
    "batch.size": "50000",
    "table.name.format": "<table name>",
    "pk.mode": "record_key",
    "pk.fields": "field1,field2",
    "auto.create": "true",
    "auto.evolve": "true",
    "max.retries": "10",
    "retry.backoff.ms": "3000",
    "mode": "bulk",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://localhost:8081"
  }
}
2q5ifsrm

2q5ifsrm1#

你需要使用 pk.moderecord.value . 这意味着从消息的值中获取字段,并将其用作目标表中的主键和 UPSERT 目的。
如果你设置 record.key 它将尝试从Kafka消息密钥中获取密钥字段。除非消息键中确实有值,否则这不是您要使用的设置。
这些可能会进一步帮助您:
?https://rmoff.dev/kafka-jdbc-video
?https://rmoff.dev/ksqldb-jdbc-sink-video

相关问题