我已经成功地将一个数据库复制到另一个数据库中,其中一个Kafka集群通过Debezium Mysql连接器连接到源数据库,而Sink数据库由Debezium JDBC连接器写入。
到目前为止,我已经能够在upsert模式下完全复制数据库,但当然,这会导致在重置Kafka集群时记录重复,因此我想在upsert模式下配置Sink Connector。
棘手的部分是表有点大,因此手动定义包含id的字段是不切实际的,并且不是每个表都定义了键。
因此,这是我根据设置**“primary.key.mode”**得到的错误:
*record.key:当连接器到达一个没有定义id的表时,它会崩溃,因为表的主题中的键的消息为null
*Kafka:它有点工作,但这会导致在源表中创建三个额外的列,这是不可取的
*record_value:连接器立即崩溃,要求我定义primary.key.field。也许我误解了官方文档,但我不期望这种行为。
那么,我是不是运气不好,不应该指望Debezium自己巧妙地处理这些情况?
1条答案
按热度按时间3pvhb19x1#
record.key:当连接器到达一个没有定义id的表时,它会崩溃,因为表的主题中的键的消息为null
对于任何JDBC接收器连接器来说,从多个主题中使用数据集可能具有与连接器配置不同的不一致键结构是不期望的行为。当您遇到这种情况时,你设置了1个接收器连接器,在那里你会在主题中有记录,这些记录有使用
record_key
的键,你会使用一个完全独立的连接器,在那里你可能有一个无键的表,在那里你也许可以使用record_value
来定义目标表的代理键设置。Kafka:它有点工作,但这会导致在源表中创建三个额外的列,这是不可取的
将
schema.evolution
设置为none
后,您或您的DBA应该负责确保目标表的结构与数据集一致。record_value:连接器立即崩溃,要求我定义primary. key. field。也许我误解了官方文档,但我不期望这种行为。
您应该能够在此上下文中使用
record_value
,这意味着值的Struct
中的所有列都被视为主键字段;但是,您必须小心这种设置,特别是如果您的值字段中有特定的数据类型,这些数据类型可能对您的目标数据库的主键无效,例如TEXT
或LOB列等。如果你遇到了
record_value
的问题,你只有VARCHAR和NUMERIC列,并被告知你需要指定primary.key.fields
,那么我会说这是一个回归,请打开一个Jira,这样我们就可以纠正它。