在kafka confluent中,如何在使用 pk.mode=record_key
对于mysql表中的复合键?upsert模式在使用 pk.mode=record_values
. 是否需要进行其他配置?
如果我试着 pk.mode=record_key
. 错误-由以下原因引起: org.apache.kafka.connect.errors.ConnectException
:只需要定义一个pk列,因为记录的键架构是基元类型。下面是我的jdbc接收器连接器配置:
{
"name": "<name>",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "<topic name>",
"connection.url": "<url>",
"connection.user": "<user name>",
"connection.password": "*******",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "<table name>",
"pk.mode": "record_key",
"pk.fields": "field1,field2",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}
1条答案
按热度按时间2q5ifsrm1#
你需要使用
pk.mode
的record.value
. 这意味着从消息的值中获取字段,并将其用作目标表中的主键和UPSERT
目的。如果你设置
record.key
它将尝试从Kafka消息密钥中获取密钥字段。除非消息键中确实有值,否则这不是您要使用的设置。这些可能会进一步帮助您:
?https://rmoff.dev/kafka-jdbc-video
?https://rmoff.dev/ksqldb-jdbc-sink-video