使用springframework,我希望迭代一个long(id)数组并生成一个 select
查询数据库sql。查询具有 IN (...)
子句来放置ID,但限制为1000项。所有内容的数组限制为5000项,因此代码按时间将其分组为1000项,我希望使其并行。问题是,在实践中,它是串行执行的,而不是并行执行的。
代码:
@Transactional
public List<GenericRecord> process(final Collection<Long> idsList) {
final AtomicInteger counter = new AtomicInteger();
final StopWatch tookTotal = new StopWatch();
tookTotal.start();
idsList.parallelStream().collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 1000)).values().parallelStream().forEach(lot -> {
/*
lot -> Array[Long] (1000)
*/
final StopWatch took = new StopWatch();
took.start();
List<GenericRecord> fetchedItems = fetchItemsFromIds(lot);
took.stop();
LOGGER.info("M=processTransactions, SELECT QUERY => Count: {}, Took: {}ms", fetchedItems.size(), took.getLastTaskTimeMillis());
});
tookTotal.stop();
LOGGER.info("M=process, TotalSize: {}, TookTotal: {}ms", idsList.size(), tookTotal.getLastTaskTimeMillis());
return new ArrayList<>(); // just to works.
}
private List<GenericRecord> fetchItemsByIds(final Collection<Long> idsList) {
final MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource();
mapSqlParameterSource.addValue("ids", idsList);
return namedParameterJdbcTemplate.query(searchQuery, mapSqlParameterSource, new MyGenericRecordRowMapper());
}
输出:
SELECT QUERY => Count: 1001, Took: 5231ms
SELECT QUERY => Count: 1000, Took: 5901ms
SELECT QUERY => Count: 1000, Took: 5290ms
SELECT QUERY => Count: 1000, Took: 4818ms
SELECT QUERY => Count: 1000, Took: 5358ms
M=process, TotalSize: 5000, TookTotal: 21373ms
显然,它不是并行执行,它的外观是在序列上执行的。或者,我对此一无所知。我知道它有jdbc连接的限制,但应该超过5个。
ps1:我不知道为什么第一个选择get 1001 items而不是1000,但到目前为止还不是问题。ps2:目标是提高性能和速度。
1条答案
按热度按时间lyr7nygr1#
根据您发布的数据,似乎有一些并行性,甚至有一些性能提升:5231ms+5901ms+5290ms+4818ms+5358ms=26598ms>21373ms。
请注意,由于使用并行方法有一个开销,所以计时是实际并行执行的一个非常差的指标,其他一些工具更适合。
parallelStream()
并不保证返回一个并行流(在您的案例中似乎是这样),据我所知,它更倾向于作为一种简单的并行化方法,而不是作为一种有效的方法。根据具体情况,使用线程和线程池可以获得更好的改进,或者根本没有改进。