spring-data-jpa 正在将数据上传到Kafka制作者

z31licg0  于 2022-11-10  发布在  Spring
关注(0)|答案(2)|浏览(123)

我是新的Kafka在春 Boot ,我已经通过了许多教程,并得到了公平的知识约相同。
目前我被分配了一个任务,我面临着一个问题。希望在这里得到一些帮助。
方案如下所示。
1)我有一个数据库,它不断更新数百万的数据。
2)我必须每5分钟后点击DB,挑选最近更新的数据并将其发送给Kafka。

Condition-在上一次迭代中选择的旧数据不应在下一次DB调用和Kafka推送中选择。

我已经完成了Spring Scheduling的部分,通过使用Sping Boot JPA的findAll()来挑选数据,但是我如何编写逻辑,以便它不挑选旧的DB记录,而只是获取新的记录并将其推送到Kafka。
我的数据库表还有一个名为“Recent_timeStamp”的字段,类型为“datetime”

9rnv2umw

9rnv2umw1#

如果不真正了解您的逻辑和使用数据库的方式,很难判断,但是根据您所描述的,您应该在这里只执行“findAll”。相反,您应该将DB表视为时间驱动的数据:

  • 由于它具有时间戳字段,因此请确保它具有索引
  • 不要使用“findAll”,而是执行以下命令:
SELECT <...>
   FROM <YOUR_TABLE>
   WHERE RECENT_TIMESTAMP > ?
   ORDER BY RECENT_TIMESTAMP ASC

在本例中,您将获得按递增时间戳排序的记录。现在,?表示您处理的最后一个记忆的时间戳
所以你必须维持这里的状态
另一个选项是查询时间戳“小于”5分钟的数据,在这种情况下,查询将如下所示(由于实际语法不同,因此为伪代码):

SELECT <...>
   FROM <YOUR_TABLE>
   WHERE RECENT_TIMESTAMP < now() - 5 minutes
   ORDER BY RECENT_TIMESTAMP ASC

第一种方法更健壮,因为如果你的spring Boot 应用程序因为某种原因“关闭”了,你可以从它发送数据失败的点恢复和查询所有的记录。另一方面,你必须把这种指针保存在某种类型的永久存储器中。第二种解决方案“更容易”,因为你不需要“关闭”数据。I don“我没有要维护的状态,但另一方面,在重新启动后,您将丢失数据。
在这两种情况下,您可能都希望使用某种分页,因为基本上您不知道将从数据库中获取多少条记录,如果记录量超过了内存限制,应用程序将以抛出OutOfMemory错误结束。
另一种完全不同的方法是,当你向数据库写入数据而不是从数据库读取数据时,将数据扔给Kafka。此时,你可能会有一个大小(可能)有限的数据块,通常你不需要状态,因为你可以存储到数据库,并从同一个服务发送给kafka,如果你的应用程序的架构允许这样做的话。

b4wnujal

b4wnujal2#

您可以查看kafka connect component,如果它符合您的目的。
Kafka Connect是一个在Apache Kafka®和其他数据系统之间进行可扩展和可靠的数据流传输的工具。它使快速定义将大型数据集移入和移出Kafka的连接器变得简单。Kafka Connect可以将整个数据库或从所有应用服务器收集指标放入Kafka主题中。导出连接器可以将数据从Kafka主题传递到二级索引,或者导入到批处理系统(如Hadoop)中进行离线分析。

相关问题