java 检查ConcurrentHashMap.forEach(PARALLELISM_THR,(k,v)->)是否已完成

pxiryf3j  于 2023-05-12  发布在  Java
关注(0)|答案(2)|浏览(108)

有没有办法知道并发hashmap上并行foreach是否已经结束
下面是我想做一个例子:

import java.util.concurrent.ConcurrentHashMap;
import java.util.Random;
public class Main
{
    public static void main(String[] args) {
     
        System.out.println("Hello World");
        
        var mainMap = new ConcurrentHashMap<Integer, String>();
        
         Random rnd = new Random();
        
        //I don't know from the beginning how many objects
        for(int i=0; i<rnd.nextInt(100); i++){
            mainMap.put(i,"elem "+i);
        }
        
        
        
            mainMap.forEach(1, (k, v) -> {
                
                //this modelize a sub-task my application executes asynchronously
                Thread t = new Thread(()->{
                System.out.println("Object "+k+" started working <" + v+">");
                
                try {
                    Thread.sleep(k*500);    
                    } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
            }
        
        
                System.out.println("Object "+k+" done after "+(k*500)+" ms");
                });
                
                t.start();
            });
            
            //I want to print this only after all jobs are finished
            //but still I don't want to block the main (GUI) thread
            System.out.println("All job done !");
            
       return;
    }
}

现在的输出是这样的:

Hello World
All job done !
Object 0 started working <elem 0>
Object 2 started working <elem 2>
Object 4 started working <elem 4>
Object 3 started working <elem 3>
Object 1 started working <elem 1>
Object 0 done after 0 ms
Object 1 done after 500 ms
Object 2 done after 1000 ms
Object 3 done after 1500 ms

但我期待这样的东西:

Hello World
Object 0 started working <elem 0>
Object 2 started working <elem 2>
Object 4 started working <elem 4>
Object 3 started working <elem 3>
Object 1 started working <elem 1>
Object 0 done after 0 ms
Object 4 done after 2000 ms
Object 1 done after 500 ms
Object 2 done after 1000 ms
Object 3 done after 1500 ms
All job done !
lrpiutwd

lrpiutwd1#

看起来,你完全错过了API的要点。当您调用mainMap.forEach(1, …)时,操作已经并行执行,创建新线程没有意义,尤其是每个元素都没有一个线程。您正在颠覆在forEach实现中所做的全部工作(线程池的使用),并产生了您现在试图解决的问题。
forEach方法并行运行操作,当它返回时,操作已经完成。

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;

public class Main {
  public static void main(String[] args) {
    System.out.println("Hello World");
    var mainMap = new ConcurrentHashMap<Integer, String>();
    var rnd = ThreadLocalRandom.current();

    // I don't know from the beginning how many objects
    for(int i = 0; i < rnd.nextInt(100); i++) {
      mainMap.put(i, "elem " + i);
    }

    mainMap.forEach(1, (k, v) -> {
      System.out.println("Object " + k
          + " started working <" + v + "> " + Thread.currentThread());
      try {
        Thread.sleep(k * 500);
      }
      catch(InterruptedException e1) {
        e1.printStackTrace();
      }
      System.out.println("Object " + k + " done after " + (k * 500) + " ms");
    });

    System.out.println("All job done !");
  }
}
Hello World
Object 0 started working <elem 0> Thread[#1,main,5,main]
Object 0 done after 0 ms
Object 1 started working <elem 1> Thread[#1,main,5,main]
Object 2 started working <elem 2> Thread[#22,ForkJoinPool.commonPool-worker-1,5,main]
Object 4 started working <elem 4> Thread[#23,ForkJoinPool.commonPool-worker-2,5,main]
Object 1 done after 500 ms
Object 2 done after 1000 ms
Object 3 started working <elem 3> Thread[#22,ForkJoinPool.commonPool-worker-1,5,main]
Object 4 done after 2000 ms
Object 5 started working <elem 5> Thread[#23,ForkJoinPool.commonPool-worker-2,5,main]
Object 3 done after 1500 ms
Object 5 done after 2500 ms
All job done !

结果与您的预期略有不同,因为第一个密钥的等待时间太短。
如果您希望避免阻塞启动器线程,例如因为它是事件调度线程,所以将整个操作提交给后台线程:

CompletableFuture<Void> operation = CompletableFuture.runAsync(() -> {
  mainMap.forEach(1, (k, v) -> {
    System.out.println("Object " + k
        + " started working <" + v + "> " + Thread.currentThread());
    try {
      Thread.sleep(k * 500);
    }
    catch(InterruptedException e1) {
      e1.printStackTrace();
    }
    System.out.println("Object " + k + " done after " + (k * 500) + " ms");
  });
});

然后,您可以轮询操作状态,例如

System.out.println(operation.isDone()? "done.": "still running");

而且还包括与链相关的操作,例如

operation.thenAccept(_void -> System.out.println("All job done !"));

或者,对于您的实际用例更有趣的是,

operation.thenAcceptAsync(_void -> {
    /* update UI components */
  }, EventQueue::invokeLater);

以在完成之后更新事件分派线程中的UI组件,同时在运行时不阻塞事件分派线程。

j5fpnvbx

j5fpnvbx2#

一个简单的方法是将所有的Thread收集到一个列表中,然后join每个线程(在一个单独的Thread中,这样就不会阻塞主线程)

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

public class X {
    public static void main(String[] args) throws InterruptedException {

        System.out.println("Hello World");

        var mainMap = new ConcurrentHashMap<Integer, String>();

        Random rnd = new Random();

        //I don't know from the beginning how many objects
        for (int i = 0; i < rnd.nextInt(100); i++) {
            mainMap.put(i, "elem " + i);
        }

        List<Thread> threads = new ArrayList<>();

        mainMap.forEach(1, (k, v) -> {

            //this modelize a sub-task my application executes asynchronously
            Thread t = new Thread(() -> {
                System.out.println("Object " + k + " started working <" + v + ">");

                try {
                    Thread.sleep(k * 500);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }

                System.out.println("Object " + k + " done after " + (k * 500) + " ms");
            });
            threads.add(t);
            t.start();
        });
        Executors.newSingleThreadExecutor().submit(() -> {
            for (Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            System.out.println("All job done !");
        });
        System.out.println("Main thread not blocked");
    }
}

但是我不认为无限线程数的策略是好的--通常存在一个最佳的并行度,这取决于处理器的数量和每个线程执行的IO量。
我认为最好使用线程池:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class X {
    public static void main(String[] args) throws InterruptedException {

        System.out.println("Hello World");

        var mainMap = new ConcurrentHashMap<Integer, String>();

        Random rnd = new Random();

        //I don't know from the beginning how many objects
        for (int i = 0; i < rnd.nextInt(100); i++) {
            mainMap.put(i, "elem " + i);
        }

        ExecutorService executor = Executors.newFixedThreadPool(10);
        mainMap.forEach(1, (k, v) -> {
            executor.execute(() -> {
                System.out.println("Object " + k + " started working <" + v + ">");
                try {
                    Thread.sleep(k * 500);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                System.out.println("Object " + k + " done after " + (k * 500) + " ms");
            });
        });
        Executors.newSingleThreadExecutor().submit(() -> {
            try {
                executor.shutdown();
                executor.awaitTermination(1, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("All job done !");
        });
        System.out.println("Main thread not blocked");
    }
}

相关问题