kafka jdbc事务源配置

yc0p9oo0  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(352)

我正在尝试使用kafka connect从两个表中获取行。我配置了 connect-file-source.properties 以这种方式

name=jdbc_source_postgres_foobar_01
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=org.apache.kafka.connect.json.JsonConverter

# key.converter.schema.registry.url=http://localhost:8081

value.converter=org.apache.kafka.connect.json.JsonConverter

# value.converter.schema.registry.url=http://localhost:8081

connection.url=jdbc:postgresql://localhost:5432/store?user=postgres&password=root
table.whitelist=author,book
mode=incrementing
incrementing.column.name=id
validate.non.null=false
topics=author,book
topic.prefix=

涉及的表是 author 以及 book ,后者有外键引用 author .
然后我注册了一个侦听器来使用来自“author”和“book”主题的消息,以便将它们插入到另一个数据库中。

@KafkaListener(
    topics={"author","book"}, 
    groupId = "foo", 
    containerFactory = "fooKafkaListenerContainerFactory"
)
public void listenGroupFoo(@Payload PostgresTableRow message) {
    System.out.println("Received" + message);
    String tableName = message.tableName();
    HashMap<String, Object> params = message.params();

    insert(tableName, params);
}

当涉及的表彼此之间没有约束时,这种方法可以很好地工作,但在本例中,当“book”主题中的消息在“author”中的消息之前被使用时,我会遇到错误。
例如,我在source db中插入作者“george orwell”和 id=23 还有《1984》这本书 id=37 以及 authorId=23 ,两条消息被推入Kafka,一条在“作者”主题中,另一条在“书”主题中。如果消息先从“book”主题消费,然后从“author”主题消费,我会得到一个错误,即无法在我的sink db中插入id为37的书,因为不存在id为23的作者。
那我怎么解决这个问题呢?有没有一种方法可以将多个表推到一个主题中并授予顺序?

sy5wg1nm

sy5wg1nm1#

你面临着一个复杂的问题要解决的CDC(变化数据捕捉)世界与Kafka中间。
您希望以这样一种方式实现从数据库到kafka和从kafka到另一个数据库的事务一致、有序、一次复制:您所面临的引用完整性问题,即:由于竞争条件,不会发生。
我建议阅读罗宾·莫法特关于cdc和Kafka连接jdbc连接器的文章,以及肖恩·罗伯逊在Kafka峰会18上关于同一问题的演讲。
不再孤立:如何将数据库与apachekafka和cdc集成
kafka connect deep dive–jdbc源连接器
事务一致、有序、一次复制,从数据库复制到云中的kafka,并备份一个利用kafka提供端到端acid事务的解决方案
不幸的是,如果没有现成的端到端cdc解决方案,我想你要么需要非常有创意,要么投入相当多的精力来克服这个问题。

相关问题