我是新的Kafka在春 Boot ,我已经通过了许多教程,并得到了公平的知识约相同。
目前我被分配了一个任务,我面临着一个问题。希望在这里得到一些帮助。
方案如下所示。
1)我有一个数据库,它不断更新数百万的数据。
2)我必须每5分钟后点击DB,挑选最近更新的数据并将其发送给Kafka。
Condition-在上一次迭代中选择的旧数据不应在下一次DB调用和Kafka推送中选择。
我已经完成了Spring Scheduling的部分,通过使用Sping Boot JPA的findAll()来挑选数据,但是我如何编写逻辑,以便它不挑选旧的DB记录,而只是获取新的记录并将其推送到Kafka。
我的数据库表还有一个名为“Recent_timeStamp”的字段,类型为“datetime”
2条答案
按热度按时间9rnv2umw1#
如果不真正了解您的逻辑和使用数据库的方式,很难判断,但是根据您所描述的,您应该在这里只执行“findAll”。相反,您应该将DB表视为时间驱动的数据:
在本例中,您将获得按递增时间戳排序的记录。现在,
?
表示您处理的最后一个记忆的时间戳所以你必须维持这里的状态
另一个选项是查询时间戳“小于”5分钟的数据,在这种情况下,查询将如下所示(由于实际语法不同,因此为伪代码):
第一种方法更健壮,因为如果你的spring Boot 应用程序因为某种原因“关闭”了,你可以从它发送数据失败的点恢复和查询所有的记录。另一方面,你必须把这种指针保存在某种类型的永久存储器中。第二种解决方案“更容易”,因为你不需要“关闭”数据。I don“我没有要维护的状态,但另一方面,在重新启动后,您将丢失数据。
在这两种情况下,您可能都希望使用某种分页,因为基本上您不知道将从数据库中获取多少条记录,如果记录量超过了内存限制,应用程序将以抛出
OutOfMemory
错误结束。另一种完全不同的方法是,当你向数据库写入数据而不是从数据库读取数据时,将数据扔给Kafka。此时,你可能会有一个大小(可能)有限的数据块,通常你不需要状态,因为你可以存储到数据库,并从同一个服务发送给kafka,如果你的应用程序的架构允许这样做的话。
b4wnujal2#
您可以查看kafka connect component,如果它符合您的目的。
Kafka Connect是一个在Apache Kafka®和其他数据系统之间进行可扩展和可靠的数据流传输的工具。它使快速定义将大型数据集移入和移出Kafka的连接器变得简单。Kafka Connect可以将整个数据库或从所有应用服务器收集指标放入Kafka主题中。导出连接器可以将数据从Kafka主题传递到二级索引,或者导入到批处理系统(如Hadoop)中进行离线分析。