我开始和Kafka玩,从数据库导入数据时,我脑子里有很多问题。
我的场景是:我有一个带有几个表的关系数据库,我想将它们导入到kafka主题中。我脑子里的困惑是:如果我使用kafkaconnect导入所有这些表,那么我的客户如何知道数据何时都已加载?在我的使用者开始工作之前,我需要将不同表中的大部分记录加载到kafka主题中。
例如,我的数据库中有一个customer和一个order表。我想将这两个表导入到客户和订单主题中。如果我的消费者因为有新订单而启动,但有关客户的信息尚未出现在客户主题中,则它将不起作用。我把事情复杂化了吗?
3条答案
按热度按时间0md85ypi1#
Kafka提供了一个随时间变化的实时事件流。重新。您的问题“我的消费者如何知道数据何时全部加载”——您询问的是某个有限活动(将数据加载到kafka?)的完成情况,但随着时间的推移,事件不断发生。您的streams应用程序不断地运行并继续运行—没有“结束”。
你现在对表格中的数据和Kafka主题中的事件的看法听起来像是把它们当作两个等价的概念,但事实并非如此。
gt0wga4j2#
如果我使用kafka connect导入所有这些表,那么我的用户如何知道数据何时都已加载?
正如其他答案所说,它们不会跳出盒子。
您需要手动(或以编程方式)监视导入过程,或者确定“完整性”的初始条件,或者至少确定您有信心启动消费者的初始条件。
从那以后,我建议要么建立cdc,要么让客户和订单服务直接写进Kafka的主题。如果您直接写信给Kafka,您将需要考虑幂等事件,例如新客户帐户的排序、编辑和删除。例如,如果客户被删除,您是否仍保留与该客户关联的所有订单?
一旦两个数据源都包含在主题中,欢迎您将客户订单与kstreams/ksql结合起来
这是行不通的
您可能想澄清“it”是什么,但至少有一个主题的消费者可以使用。在导入数据时,上面提到的连接最终是一致的,但这是一种折衷,因为在异步处理中没有事务性的原子语义
yc0p9oo03#
从技术上讲:您需要为生产者和消费者提供同步机制。kafka不提供让消费者知道“主题已满”的功能(因为可能总是有人在写主题)。
生产者需要以某种方式(例如,通过另一个Kafka主题?)告知消费者“他们已经完成了他们的阶段”。只有在消费者收到该信息后,他们才能开始处理。
旁白:你提到
你现在是怎么解决这个问题的?