通常在Java中对for循环进行多线程处理

tvz2xvvm  于 2023-01-07  发布在  Java
关注(0)|答案(4)|浏览(397)

假设给定整数ab,这样就形成了一个感兴趣的[a,b]中整数的范围,该范围可以跨越10^9整数,我想对给定函数f : N -> N的所有整数a <= n <= b的值求和,该范围非常大,所以我想使用多线程来完成。
不太正式地说,我想并行化以下代码:

long sum = 0;
    
for (long n = a ; n <= b ; n++)
    sum += f(n);
    
System.out.println(sum);

理想情况下(至少在我看来),这个范围将在处理器可用的线程数上平均划分(假设f(n)对于范围内的每个n具有几乎相同的复杂度和运行时间)。这些值是完全独立的,并且f实际上可以是任何函数。例如,它可以输出数字的位数之和。但它真的可以是任何东西,这只是一个例子。
在Java中,是否有一种使用多线程的通用方法可以做到这一点?

zxlwwiss

zxlwwiss1#

并行流

这个特殊的用例非常适合并行流。请参见tutorial by Oraclejava.util.stream.LongStream类用于64位long整数流。
像这样实现:

long sum = 
    LongStream
        .rangeClosed(a, b)
        .parallel()
        .map(n -> f(n))
        .sum();

System.out.println(sum);

参见类似代码run at Ideone.com

t5zmwmid

t5zmwmid2#

您可能想了解一下fork/join framework;这是一种将任务拆分为许多小的独立任务然后将它们组合在一起的原则的通用方法,并为您提供了有关如何拆分线程的所有控制。
或者,您可以使用流API的parallel()方法,但请注意,该方法并不能解释或保证它是如何工作的。您无法控制哪个方面被并行化,也无法控制所涉及的线程数量。对于一般情况,您也可以使用它,但作为一项规则,如果“我最好并行写这个,否则会太慢”是相关的,那么你需要一些保证和一些控制。这里是oracle's tutorial/explanation on parallel streams。对于这个特定的情况,它看起来或多或少会做你想要的(例如,如果您尝试将其应用到的流是Files.lines所提供的,则这会变得棘手--其中并行性可能由于其应用的位置与瓶颈所在的位置而受到阻碍)。

z8dt9xmd

z8dt9xmd3#

RecursiveTask适用于此类问题,线程将由ForkJoinPool管理。
一般的想法是将问题分解为更短的子问题,直到单个线程能够自己管理给定的子问题。

class RecursiveSum extends RecursiveTask<Long> {
    private final int THLD = 1_000;
    private int low, high;
    public RecursiveSum(int high) {
        this(0,high);
    }
    public RecursiveSum(int low, int high) {
        this.low = low; this.high = high;
    }
    
    private long sum(int l,int h) {
        long sum = 0;
        for (int i=l; i<h; i++) sum += i;
        return sum;
    }

    @Override
    protected Long compute() {
        int len = high-low;
        if (len<=THLD) return sum(low,high);
        RecursiveSum rhalf = new RecursiveSum(low+len/2,high);
        rhalf.fork();
        RecursiveSum lhalf = new RecursiveSum(low,low+len/2);
        high = low+len/2;
        return lhalf.compute()+rhalf.join();
    }
}

像这样使用它:

long r = new RecursiveSum(1_000_000_000).invoke();
System.out.println("Sum="+r);
ef1yzkbh

ef1yzkbh4#

1.创建你的sum类(编辑'f'函数到你的函数):

class sumFun implements Callable<Long> {
   public long f(long num){
      return num;
   }
   private long a;
   private long b;
   public sumFun(long a, long b) {
       this.a = a;
       this.b = b;
   }
   @Override
   public Long call() {
       long sum = 0;
       for (long i = a; i <= b; i++) {
           sum += f(i);
       }
       return sum;
   }
}

1.主要内容:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    long start = 1;
    long end = 10000000;
    int numOfThreads = 5;
    long step = (end - start + 1) / numOfThreads;
    List<sumFun> tasks = new ArrayList<>();
    for (long i = start; i <= end; i += step) {
        long tmp = end;
        if(i+step-1 < end){
            tmp = i+step-1;
        }
        tasks.add(new sumFun(i, tmp));
    }
    ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);
    List<Future<Long>> results = executor.invokeAll(tasks);
    long sum = 0;
    for (Future<Long> result : results) {
        sum += result.get();
    }
    System.out.println("sum = " + sum);
    executor.shutdown();
 }

相关问题