如何使用parallelstream()并行化namedparameterjdbctemplate调用?

bkhjykvo  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(459)

使用springframework,我希望迭代一个long(id)数组并生成一个 select 查询数据库sql。查询具有 IN (...) 子句来放置ID,但限制为1000项。所有内容的数组限制为5000项,因此代码按时间将其分组为1000项,我希望使其并行。问题是,在实践中,它是串行执行的,而不是并行执行的。
代码:

@Transactional
    public List<GenericRecord> process(final Collection<Long> idsList) {
        final AtomicInteger counter = new AtomicInteger();
        final StopWatch tookTotal = new StopWatch();
        tookTotal.start();

        idsList.parallelStream().collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 1000)).values().parallelStream().forEach(lot -> {
            /*
                 lot -> Array[Long] (1000)
             */
            final StopWatch took = new StopWatch();
            took.start();
            List<GenericRecord> fetchedItems = fetchItemsFromIds(lot);
            took.stop();
            LOGGER.info("M=processTransactions, SELECT QUERY => Count: {}, Took: {}ms", fetchedItems.size(), took.getLastTaskTimeMillis());
        });

        tookTotal.stop();
        LOGGER.info("M=process, TotalSize: {}, TookTotal: {}ms", idsList.size(), tookTotal.getLastTaskTimeMillis());

        return new ArrayList<>(); // just to works.
    }

    private List<GenericRecord> fetchItemsByIds(final Collection<Long> idsList) {
        final MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource();
        mapSqlParameterSource.addValue("ids", idsList);
        return namedParameterJdbcTemplate.query(searchQuery, mapSqlParameterSource, new MyGenericRecordRowMapper());
    }

输出:

SELECT QUERY => Count: 1001, Took: 5231ms
SELECT QUERY => Count: 1000, Took: 5901ms
SELECT QUERY => Count: 1000, Took: 5290ms
SELECT QUERY => Count: 1000, Took: 4818ms
SELECT QUERY => Count: 1000, Took: 5358ms
M=process, TotalSize: 5000, TookTotal: 21373ms

显然,它不是并行执行,它的外观是在序列上执行的。或者,我对此一无所知。我知道它有jdbc连接的限制,但应该超过5个。
ps1:我不知道为什么第一个选择get 1001 items而不是1000,但到目前为止还不是问题。ps2:目标是提高性能和速度。

lyr7nygr

lyr7nygr1#

根据您发布的数据,似乎有一些并行性,甚至有一些性能提升:5231ms+5901ms+5290ms+4818ms+5358ms=26598ms>21373ms。
请注意,由于使用并行方法有一个开销,所以计时是实际并行执行的一个非常差的指标,其他一些工具更适合。 parallelStream() 并不保证返回一个并行流(在您的案例中似乎是这样),据我所知,它更倾向于作为一种简单的并行化方法,而不是作为一种有效的方法。根据具体情况,使用线程和线程池可以获得更好的改进,或者根本没有改进。

相关问题