我想在两个线程中执行一些处理器。它们中的一些是独立的,可以随时运行,但它们中的一些具有依赖性。每当执行顺序到达该处理器时,我想检查是否所有以前的可调用任务都已执行?并且将来的任务应该在当前执行后执行。
下面是主线程方法
PackageExportGraph executeMultiThread(PackageExportGraph exportGraphInp, PackageExportContext exportContextnInp)
throws WTException {
Map<PackageExportDependencyProcessor, Boolean> processorToParallelExecutionMap = new LinkedHashMap<>();
this.processorQueue = new LinkedBlockingQueue<>();
ExecutorService execService = null;
try {
int threads = 2;// 2
countDownLatch = new CountDownLatch(threads);
execService = ExecutorServiceFactory.getDefault().newExecutorService(threads, true);
boolean isThread1Started = false;
ThreadedDepProcessor thread1 = new ThreadedDepProcessor(
exportGraphInp, countDownLatch,
processorToParallelExecutionMap, processorQueue, exportContextnInp, isThread1Started);
threadList.add(thread1);
thread1.addListener(this);
boolean isThread2Started = false;
ThreadedDepProcessor thread2 = new ThreadedDepProcessor(
exportGraphInp, countDownLatch,
processorToParallelExecutionMap, processorQueue, exportContextnInp, isThread2Started);
threadList.add(thread2);
thread1.addListener(this);
List<Future<LinkedBlockingQueue>> futureList = new ArrayList<>();
for (ThreadedDepProcessor thread : threadList) {
Future f = execService.submit(thread);
System.out.println("f " + f);
futureList.add(f);
}
int currentidx = 0;
for (PackageExportDependencyProcessor processor : origOrderedList) {
if (!processorToParallelExecutionMap.containsKey(processor)) {
System.out.println(" parallel threadStatusMap values 1 - " + threadStatusMap.values());
System.out.println("Adding parallel - " + processor);
if (currentidx > 0) {
while (threadStatusMap.containsValue(false)) {
System.out.println("Waiting");
System.out.println("threadStatusMap values - " + threadStatusMap.values());
Thread.sleep(1000);
}
Thread.sleep(2000);
// execService.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("Size - " + futureList.size());
for (Future f : futureList) {
System.out.println("futureList is done " + f.isDone());
System.out.println("Getting future Object");
if (f.isDone()) {
continue;
}
Object o = f.get(10, TimeUnit.SECONDS);
System.out.println(o);
/*
* Object object = f.get(10, TimeUnit.SECONDS); System.out.println("Obj " + object);
*/
}
processorQueue.put(processor);
Thread.sleep(2000);
}
else {
processorQueue.put(processor);
Thread.sleep(2000);
System.out.println("Size - " + futureList.size());
for (Future f : futureList) {
System.out.println("futureList is done " + f.isDone());
System.out.println("Getting future Object");
if (f.isDone()) {
continue;
}
Object o = f.get(10, TimeUnit.SECONDS);
System.out.println(o);
/*
* Object object = f.get(10, TimeUnit.SECONDS); System.out.println("Obj " + object);
*/
}
// execService.awaitTermination(5, TimeUnit.SECONDS);
while (threadStatusMap.containsValue(false)) {
System.out.println("Waiting");
System.out.println("threadStatusMap values - " + threadStatusMap.values());
Thread.sleep(1000);
}
}
for (ThreadedDepProcessor thread : threadList) {
System.out.println("Finished adding dependents" + thread.finishedAddingDependents.get());
}
}
else {
System.out.println("Adding non-parallel - " + processor);
processorQueue.put(processor);
}
currentidx++;
}
} catch (WTException | RuntimeException exc) {
if (Objects.nonNull(execService)) {
execService.shutdown();
}
throw exc;
} catch (Exception exc) {
throw new WTException(exc);
} finally {
System.out.println("shutting down");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
execService.shutdown();
}
return exportGraphInp;
}
这个是可调用的
@Override
public LinkedBlockingQueue call() throws WTException, InterruptedException {
try {
System.out.println("Started - ");
isThreadStarted = true;
while (!processorQueue.isEmpty()) {
nextEligible = processorQueue.take();
if (Objects.isNull(nextEligible)) {
finishedAddingDependents.set(true);
break;
}
System.out.println("calling addDependentObjects for processor - " + nextEligible);
nextEligible.addDependentObjects(exportGraph, exportContext);
nextEligible = null;
// notifyListeners();
}
} catch (Exception e) {
System.out.println("Error occured " + e);
e.printStackTrace();
return processorQueue;
} finally {
countDownLatch.countDown();
System.out.println("countDownLatch now - " + countDownLatch.getCount());
}
return processorQueue;
}
}
我试图检查while(future.isDone()),但它进入了无限循环。我想检查线程/可调用执行是否已启动。
如果启动,则在执行串行处理器时,我希望等待所有现有处理器执行完毕,然后开始执行,并等待其执行,不选择下一个
1条答案
按热度按时间wz3gfoph1#
我所做的是管理一个同步的收集,它将让我们知道每个处理器的执行状态,并基于此我们可以等待或继续