我正试图处理一个相对巨大的 Stream
的 List
在多线程中,使用 ExecutorService
. 这个方法看起来像这样。
public void initMigration() {
ExecutorService executorService = Executors.newCachedThreadPool();
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
streamOfLists.forEach(record4List -> {
Runnable runnable = () -> {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
};
executorService.submit(runnable);
});
}
LOGGER.info("Shutting down the ExecutorService");
executorService.shutdown();
}
基本上,我要做的是 List
在 Stream
,一个 Runnable
正在创建并提交给 ExecutorService
. 它似乎工作正常。但是,我现在真正想做的是,看看有没有办法 ExecutorService
跑第一个 Runnable
从第一个 List
在 Stream
同时阻止其他人 Runnables
直到它执行,并继续运行其他 Runnables
(并行)之后。真的需要一些帮助。
2条答案
按热度按时间hyrbngr71#
@阿列克谢的方法是(imo)解决这个问题的正确方法。别挡着跑。而是在运行runnables的前提条件已经满足时提交它们。
让一个可运行的块与其他块的问题是,你很容易堵塞执行器的线程池与任务被阻止等待另一个任务完成。实际上,如果线程池是有界的,那么您甚至可能会遇到这样的情况:所有线程都处于这种状态,并且执行器无法启动将取消阻止它们的任务。结果:死锁!
如果您仍然想阻止runnable(尽管有上述内容),那么可以使用
CountDownLatch
.在示例化
Runnable
s、 创建CountDownLatch
初始计数器为的示例1
. 此示例必须由所有Runnable
s。代码一
Runnable
这样它就能得到一个List
,处理它,然后调用latch.count()
;请稍等
Runnable
打电话latch.await()
然后提取并处理List
.使用第一个任务提交一个任务
Runnable
剩下的用第二个。vsnjm48y2#
您可以获取第一个runnable,执行它,然后提交其他runnable。
哪里