SQLServer—从20个相关表(通过id)中获取数据,将它们组合到一个json文件中,并利用SpringBatch实现这一点

hk8txs48  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(293)

我在sql server中有一个人数据库,其中有地址、许可证、亲属等表,大约有20个。所有表都具有每个人唯一的id参数。这些表中有数百万条记录。我需要使用公共id参数组合这些个人记录,并将其转换为带有一些列名更改的json表文件。这个json文件然后通过生产者推送到kafka。如果我能以kafka生产者作为item writer得到这个例子的话——很好,但真正的问题是理解如何利用spring批处理item reader、processor和item writer来创建复合json文件的策略和细节。这是我的第一个spring批处理应用程序,所以我对这个比较陌生。
我希望得到关于使用复合读取器或处理器的实现策略的建议,该读取器或处理器使用personid作为游标,并使用每个表的id查询每个表,将结果记录转换为json,并将其聚合为一个复合的关系json文件,其中根表persondata提供给kafka集群。
基本上我有一个数据源,为读者提供相同的数据库。我计划使用person表获取id和其他对person唯一的记录,并使用id作为其他19个表的where子句。将表中的每个resultset转换为json,最后合成json对象并写入kafka。

2j4z5cfb

2j4z5cfb1#

我们进行了类似的练习,将多个表中的100mn+行作为json的一种形式进行迁移,以便将其发布到消息总线。
其思想是创建一个视图,对数据进行非规范化,并使用jdbcpagingitemreader从该视图中读取数据。从一个源读取数据的开销较小。
反规范化数据时,请确保主表中没有多行。
示例-sql server-

create or alter view viewName as
select master.col1 , master.col2,
(select dep1.col1,
               dep1.col2
               from dependent1 dep1
        where dep1.col3 = master.col3 for json path
       )                as dep1
from master master;

上面的内容将为您提供json字符串中的依赖表数据,每个主表数据对应一行。一旦检索到数据,就可以使用gson或jackson将其转换为pojo。
我们试图避免使用jdbccursoritemreader,因为它将从内存中提取所有数据,并从中逐个读取数据。它不支持分页。

sbtkgmzw

sbtkgmzw2#

我们在一个项目中有这样一个需求,并用以下方法解决了它。
在并行运行的splitflow中,我们有一个step for ever表,它将表的数据加载到文件中,并按公共id排序(这是可选的,但如果文件中有数据,则测试更容易)。
然后我们实现了自己的“mergereader”。这个mergereader对每个文件/表都有flatfileitemreaders(我们称之为datareaders)。所有这些flatfileitemreaders都用一个singleitempeakeableitemreader Package 。mergereader的read方法的逻辑如下:

public MyContainerPerId read() {

   // you need a container to store the items, that belong together
   MyContainerPerId container = new MyContainerPerId();

   //  peek through all "dataReaders" to find the lowest actual key
   int lowestId = searchLowestKey();

   for (Reader dataReader : dataReaders) {
       // I assume, that more than one entry in a table can belong to
       // the same person id
       wihile (dataReader.peek().getId() == lowestId) {
       {
             container.add(dataReader.read());
       }
   }

   // the container contains all entries from all tables
   // belonging to the same person id    
   return container;
}

如果您需要重新启动功能,您可以以某种方式实现itemstream,它可以跟踪每个datareader的当前readposition。

e0bqpujr

e0bqpujr3#

我使用这里描述的基于驱动查询的itemreaders使用模式来解决这个问题。
reader:只是jdbccursoritemreader的一个默认实现,使用sql获取
唯一的关系id(例如,从person-中选择id)
processor:使用这个长id作为输入,我使用spring的jdbctemplate实现的dao通过查询每个表获取特定id的数据(例如,select*from license where id=),并将结果以列表格式Map到person的pojo,然后转换为json对象(使用jackson),然后转换为字符串
writer:如果使用kafka,可以用json字符串写出文件,也可以将json字符串发布到主题中

相关问题