java 如何等待几个可调用任务完成后再执行

osh3o9ms  于 2022-12-21  发布在  Java
关注(0)|答案(1)|浏览(175)

我想在两个线程中执行一些处理器。它们中的一些是独立的,可以随时运行,但它们中的一些具有依赖性。每当执行顺序到达该处理器时,我想检查是否所有以前的可调用任务都已执行?并且将来的任务应该在当前执行后执行。
下面是主线程方法

PackageExportGraph executeMultiThread(PackageExportGraph exportGraphInp, PackageExportContext exportContextnInp)
            throws WTException {
        Map<PackageExportDependencyProcessor, Boolean> processorToParallelExecutionMap = new LinkedHashMap<>();
        this.processorQueue = new LinkedBlockingQueue<>();

        ExecutorService execService = null;
        try {
           
            int threads = 2;// 2

            countDownLatch = new CountDownLatch(threads);
            execService = ExecutorServiceFactory.getDefault().newExecutorService(threads, true);
            boolean isThread1Started = false;
            ThreadedDepProcessor thread1 = new ThreadedDepProcessor(
                    exportGraphInp, countDownLatch,
                    processorToParallelExecutionMap, processorQueue, exportContextnInp, isThread1Started);
            threadList.add(thread1);
            thread1.addListener(this);
            boolean isThread2Started = false;

            ThreadedDepProcessor thread2 = new ThreadedDepProcessor(
                    exportGraphInp, countDownLatch,
                    processorToParallelExecutionMap, processorQueue, exportContextnInp, isThread2Started);
            threadList.add(thread2);
            thread1.addListener(this);
            List<Future<LinkedBlockingQueue>> futureList = new ArrayList<>();

            for (ThreadedDepProcessor thread : threadList) {
                Future f = execService.submit(thread);
                System.out.println("f " + f);
                futureList.add(f);
            }

            int currentidx = 0;
            for (PackageExportDependencyProcessor processor : origOrderedList) {
                if (!processorToParallelExecutionMap.containsKey(processor)) {

                    System.out.println(" parallel threadStatusMap values 1 - " + threadStatusMap.values());
                    System.out.println("Adding parallel - " + processor);
                    if (currentidx > 0) {
                        while (threadStatusMap.containsValue(false)) {

                            System.out.println("Waiting");
                            System.out.println("threadStatusMap values - " + threadStatusMap.values());
                            Thread.sleep(1000);
                        }
                        Thread.sleep(2000);

                        // execService.awaitTermination(5, TimeUnit.SECONDS);
                        System.out.println("Size - " + futureList.size());
                        for (Future f : futureList) {

                            System.out.println("futureList is done " + f.isDone());
                            System.out.println("Getting future Object");
                            if (f.isDone()) {
                                continue;
                            }

                            Object o = f.get(10, TimeUnit.SECONDS);
                            System.out.println(o);
                           
                            /*
                             * Object object = f.get(10, TimeUnit.SECONDS); System.out.println("Obj " + object);
                             */
                        }
                        processorQueue.put(processor);
                        Thread.sleep(2000);
                    }
                    else {
                        processorQueue.put(processor);
                        Thread.sleep(2000);
                        System.out.println("Size - " + futureList.size());
                        for (Future f : futureList) {
                            System.out.println("futureList is done " + f.isDone());
                            System.out.println("Getting future Object");
                            if (f.isDone()) {
                                continue;
                            }
                            Object o = f.get(10, TimeUnit.SECONDS);
                            System.out.println(o);

                            /*
                             * Object object = f.get(10, TimeUnit.SECONDS); System.out.println("Obj " + object);
                             */
                        }
                        // execService.awaitTermination(5, TimeUnit.SECONDS);
                        while (threadStatusMap.containsValue(false)) {
                            System.out.println("Waiting");
                            System.out.println("threadStatusMap values - " + threadStatusMap.values());
                            Thread.sleep(1000);
                        }
                    }

                    for (ThreadedDepProcessor thread : threadList) {
                        System.out.println("Finished adding dependents" + thread.finishedAddingDependents.get());
                    }
                }
                else {
                    System.out.println("Adding non-parallel - " + processor);
                    processorQueue.put(processor);
                }
                currentidx++;
            }
        } catch (WTException | RuntimeException exc) {
            if (Objects.nonNull(execService)) {
                execService.shutdown();
            }
            throw exc;
        } catch (Exception exc) {
            throw new WTException(exc);
        } finally {
            System.out.println("shutting down");
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            execService.shutdown();
        }
        return exportGraphInp;

    }

这个是可调用的

@Override
        public LinkedBlockingQueue call() throws WTException, InterruptedException {
            try {
                System.out.println("Started - ");
                isThreadStarted = true;
                while (!processorQueue.isEmpty()) {
                    nextEligible = processorQueue.take();
                    if (Objects.isNull(nextEligible)) {
                        finishedAddingDependents.set(true);
                        break;
                    }
                    System.out.println("calling addDependentObjects for processor - " + nextEligible);
                    nextEligible.addDependentObjects(exportGraph, exportContext);
                    nextEligible = null;
                    // notifyListeners();
                }

            } catch (Exception e) {
                System.out.println("Error occured " + e);
                e.printStackTrace();
                return processorQueue;
            } finally {
                countDownLatch.countDown();
                System.out.println("countDownLatch now - " + countDownLatch.getCount());

            }
            return processorQueue;
        }
    }

我试图检查while(future.isDone()),但它进入了无限循环。我想检查线程/可调用执行是否已启动。
如果启动,则在执行串行处理器时,我希望等待所有现有处理器执行完毕,然后开始执行,并等待其执行,不选择下一个

wz3gfoph

wz3gfoph1#

我所做的是管理一个同步的收集,它将让我们知道每个处理器的执行状态,并基于此我们可以等待或继续

相关问题