我目前正在处理一个批处理,该批处理使用来自具有数百万行的大型SQL数据库的数据。
它在处理器中进行一些处理,包括通过带有连接的大型sql查询对从读取器检索到的行进行分组。
然后编写器将结果写入另一个数据表。
问题在于此批处理存在性能问题,因为Sql选择查询需要花费大量时间,并且这些步骤不是在多线程中执行的。
因此,我想以多线程方式运行它们,但问题是,这些步骤通过计算所有具有相同类型的行的总数来对行进行分组。
所以如果我把它放在多线程中,我怎么能做到呢,当每个分区都要在不同的线程中处理时,我知道有数百万行,我不能存储在上下文中,以便在步骤之后检索它们,并进行分组,而且我也不能将它们保存在数据库中,因为它有数百万行,你知道我怎么做吗?我希望我能很好地解释我的问题。提前感谢你的帮助
2条答案
按热度按时间b1zrtrql1#
我有一个类似的任务,像你的,不像我们使用java 1.7和spring 3.x。我可以提供一个配置在xml中,所以也许你将能够使用注解配置为此我没有尝试。
分区程序进行查询以计算行数并为每个线程创建块:
以下是读取器和写入器的配置:
也许有人知道如何用现代的Spring批/Spring Boot来转换这个
PS:不要使用太多线程,否则Spring批处理会浪费很多时间来填充它自己的表。你必须做一些基准测试来理解正确的配置
我还建议不要对数百万行使用jpa/hib,在我的例子中,我使用的是jdbcTemplate
EDIT有关注解配置,请参见此问题
使用分区程序进行配置的示例
swvgeqrz2#
我们有一个类似的使用案例,我必须从基于特定标准阅读数百万条记录开始,作为来自休息端点的输入,并使用20-30个线程并行处理这些记录,以满足极端的期限。但随后的挑战是,对数据库进行同样复杂的查询,然后进行分区,以在生成的线程之间共享。
一个典型的批处理过程的目标是-〉读取、进行一些http调用/操作数据,并将其写入响应日志表。
Spring批提供了跟踪已处理记录的功能,以便可以启动重新启动,以挑选剩余批次进行处理。另一种方法是在主表中设置一个标志,将记录标记为已处理,以便在重新启动期间不必挑选。
面临的多重挑战包括:
假设您有10000条记录,需要并行处理5条记录。
可以实施多种创造性解决方案,但最常用的两种适合所有使用情形的解决方案是
考虑到机器所能提供的内存,可以选择合适的线程数。例如,每个线程可以处理2000条记录。
分割是分割范围的行程,并允许每个步骤执行行程在自己的执行绪中选取并执行。对于上述步骤,我们需要分割这些范围,并在查询执行时传递它,让它撷取范围的记录,并在个别的执行绪中继续行程。
另一种划分逻辑是将线程0分配到4,并将查询基分配为该数字的模。但这种方法的一个缺点是,一个特定的范围将接收比其他范围更多的负载,而前一种方法将确保每个人都得到公平的份额。
拆分的数据将被传递到单独的线程,该线程将开始处理该数据,并在步骤中提到的提交间隔(块大小)写入数据。
编码:
示例数据读取器-〉param 1是用户想要输入的任何参数。modulo是一个步骤执行参数-从Partitioner对象传递。
如果用于模5,则Paritator对象将具有模0| 1| 2| 3| 4,这将产生5个线程,这些线程将与读取器交互并为划分的集合获取数据。
响应写入器,用于记录对任何表的响应,以记录处理的数据,用于分析或业务报告。
将写入数据操作的核心逻辑的处理器。返回的响应属于数据写入器所需的类型。
否则可能遇到这样的异常:
(java.lang.Thread.run:829)[?:?]导致原因:异常错误:准备好的语句回调; SQL [将值(?,?,?,?)插入批作业示例(作业示例标识,作业名称,作业关键字,版本)]; ORA-08177:无法序列化此事务处理访问;嵌套的异常是java.sql.SQLException:ORA-08177:无法序列化此事务处理访问
列范围分区程序可以创建为:
我们的工作将集中于执行步骤1-它将根据提供的分区器(这里是columnrange分区器)产生线程来处理该步骤。
网格大小是指并行线程数(使用计算模数);
每一个processStep步骤都是一系列阅读指定模数的特定线程的数据、处理数据然后写入数据的过程。
这可能是一个常见的Spring批处理解决方案,但也适用于涉及常用的基于SQL DB/ java的解决方案的每个迁移需求。
避免再次执行联接查询,然后进行过滤。复杂的联接可能会影响数据库性能。因此,一个更好的解决方案是提取数据一次,然后在内部拆分它。应用程序使用的内存会很大,哈希Map将填充您的查询将提取的所有数据,但java能够处理这些。可以将获取的数据传递给ListItemReader,以便并行处理该特定线程的数据列表。
对于处理并行请求(不是线程,而是对该应用程序的并行API调用),可以进行修改,以便只处理一次某个查询,使用信号量保持对该查询的锁定,以便其他线程等待该查询。一旦释放锁定,这些等待线程将发现数据存在,并且数据库将不会再次被查询。
以上实现的代码对于这个博客范围来说可能会很复杂。请随意询问您的应用程序是否需要任何用例。
我很乐意解决任何关于同样的问题。请随时联系我(Akshay),地址是gmail.com,或联系我的同事(萨加尔),地址是sagarnagdev61@gmail.com