我在使用debezium change data capture在mysql中捕获数据并使用kafka connect jdbc sink将其消费到另一个mysql时遇到了问题。
因为debezium为kafka主题生成的模式和负载与kafka connect jdbc sink期望的模式不兼容。
当jdbcsink想要在另一个mysql中使用数据并创建记录时,我遇到了一个异常。
我该如何解决这个问题?
我在使用debezium change data capture在mysql中捕获数据并使用kafka connect jdbc sink将其消费到另一个mysql时遇到了问题。
因为debezium为kafka主题生成的模式和负载与kafka connect jdbc sink期望的模式不兼容。
当jdbcsink想要在另一个mysql中使用数据并创建记录时,我遇到了一个异常。
我该如何解决这个问题?
1条答案
按热度按时间23c0lvtd1#
debezium生成的消息结构确实不同于jdbc接收器预期的消息结构。jdbc接收器期望消息中的每个字段对应于行中的一个字段,因此消息对应于行的“after”状态。另外,debeziummysql连接器执行更改数据捕获,这意味着它不仅仅包括行的最新状态。具体来说,连接器输出消息,其中键包含行的主键列或唯一键列,消息值包含信封结构,其中:
操作,例如是插入、更新还是删除
发生更改之前行的状态(插入时为空)
发生更改后行的状态(删除时为空)
特定于源的信息,包括服务器元数据、事务id、数据库和表名称、事件发生时的服务器时间戳,以及有关在何处发现事件的详细信息等。
连接器生成事件的时间戳
解决这种差异的最简单方法是使用kafka 0.10.2.x(目前最新版本是0.10.2.1)和kafka connect的新单消息转换(smts)。每个kafka connect连接器都可以配置由零个或多个smt组成的链,这些smt可以在将消息写入kafka之前转换源连接器的输出,或者在将从kafka读取的消息作为输入传递到接收器连接器之前转换这些消息。SMT有意非常简单,只处理一条消息,绝对不应该访问外部资源或保持任何状态,因此不能替代Kafka流或其他功能强大得多的流处理系统,可以连接多个输入流,并且可以执行非常复杂的操作并跨多个消息维护状态。
如果您使用kafka流来执行任何类型的处理,那么您应该考虑在kafka流应用程序中操纵消息结构。如果没有,那么SMT是解决问题的好方法。事实上,有两种方法可以使用SMT来调整消息结构。
第一种选择是使用带有debezium连接器的smt来提取/保留行的“after”状态,并在将其写入kafka之前丢弃所有其他信息。当然,你会在Kafka主题中存储更少的信息,并丢弃一些在未来可能有价值的疾病预防控制中心信息。
第二个也是imo首选的选项是保持源连接器的原样,并保留kafka主题中的所有cdc消息,但是在将消息传递到jdbc sink连接器之前,将smt与sink连接器一起使用,以提取/保留行的“after”状态,并丢弃所有其他信息。您可能可以使用kafka connect中包含的现有smt之一,但您可以考虑编写自己的smt,以便完全按照您的要求进行操作。