不同机器上的java并行流

z9ju0rcb  于 2021-06-26  发布在  Java
关注(0)|答案(2)|浏览(402)

我有一个函数,它在foreach中使用parallelstream迭代列表,然后调用一个api,将项作为param。然后将结果存储在hashmap中。

try {
            return answerList.parallelStream()
                    .map(answer -> getReplyForAnswerCombination(answer))
                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        } catch (final NullPointerException e) {
            log.error("Error in generating final results.", e);
            return null;
        }

当我在笔记本电脑1上运行时,需要1个小时。但在笔记本电脑2上,需要5个小时。
做了一些基础研究,我发现并行流使用默认的forkjoinpool.commonpool,默认情况下,它的线程数比处理器少一个。
laptop1和laptop2有不同的处理器。
有没有办法找出在laptop1和laptop2上可以并行运行的流数?
我可以使用这里给出的建议来安全地增加laptop2中并行流的数量吗?

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});
agxfikkp

agxfikkp1#

设置 java.util.concurrent.ForkJoinPool.common.parallelism 将对可用于使用 ForkJoinPool ,例如 Stream.parallel() . 但是:您的任务是否使用更多线程取决于流中的项目数,运行所需时间是否更短取决于每个任务的性质和可用的处理器。
此测试程序显示了通过一项简单任务更改此系统属性的效果:

public static void main(String[] args) {
    ConcurrentHashMap<String,String> threads = new ConcurrentHashMap<>();
    int max     = Integer.parseInt(args[0]);
    boolean parallel = args.length < 2 || !"single".equals(args[1]);
    int [] arr = IntStream.range(0, max).toArray();

    long start = System.nanoTime();

    IntStream stream = Arrays.stream(arr);
    if (parallel)
        stream = stream.parallel();
    stream.forEach(i -> {
        threads.put("hc="+Thread.currentThread().hashCode()+" tn="+Thread.currentThread().getName(), "value");
    });
    long end = System.nanoTime();

    System.out.println("parallelism: "+System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));
    System.out.println("Threads: "+threads.keySet());
    System.out.println("Array size: "+arr.length+" threads used: "+threads.size()+" ms="+TimeUnit.NANOSECONDS.toMillis(end-start));
}

添加更多的线程并不一定会加快速度。下面是一些来自测试运行的例子来统计所使用的线程。它可以帮助您为您自己的任务决定最佳的方法 getReplyForAnswerCombination() .

java -cp example.jar -Djava.util.concurrent.ForkJoinPool.common.parallelism=1000 App 100000
Array size: 100000 threads used: 37

java -cp example.jar -Djava.util.concurrent.ForkJoinPool.common.parallelism=50 App  100000
Array size: 100000 threads used: 20

java -cp example.jar APP 100000 single
Array size: 100000 threads used: 1

我建议您在@basil bourque answer中看到线程池(带或不带loom),forkjoinpool构造函数的jdk源代码中也有关于这个系统属性的一些细节。

private ForkJoinPool(byte forCommonPoolOnly)
sulc1iza

sulc1iza2#

织布机项目

如果您想在阻塞的线程代码上获得最大性能(与cpu绑定的代码相反),那么可以使用projectloom中提供的虚拟线程(fibres)。基于早期的accessjava16,现在可以提供初步的构建。

虚拟线程

虚拟线程可以大大加快速度,因为一个虚拟线程在被阻塞、搁置时处于“停驻”状态,所以另一个虚拟线程可以取得进展。这对于阻塞任务非常有效,线程数可以达到数百万。
放下溪流接近。只需将每个输入发送到一个虚拟线程。

完整示例代码

让我们定义 Answer 以及 Reply 我们的投入和产出。我们将使用 record ,这是Java16的一个新特性,是定义不可变数据驱动类的一种简化方法。编译器隐式地创建构造函数getters, equals & hashCode ,和 toString .

public record Answer (String text)
{
}

…和:

public record Reply (String text)
{
}

定义要提交给执行器服务的任务。我们写了一个名为 ReplierTask 实现 Runnable (有一个 run 方法)。
run 方法,我们休眠当前线程以模拟等待对数据库、文件系统和/或远程服务的调用。

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

public class ReplierTask implements Runnable
{
    private Answer answer;
    ConcurrentMap < Answer, Reply > map;

    public ReplierTask ( Answer answer , ConcurrentMap < Answer, Reply > map )
    {
        this.answer = answer;
        this.map = map;
    }

    private Reply getReplyForAnswerCombination ( Answer answer )
    {
        // Simulating a call to some service to produce a `Reply` object.
        try { Thread.sleep( Duration.ofSeconds( 1 ) ); } catch ( InterruptedException e ) { e.printStackTrace(); }  // Simulate blocking to wait for call to service or db or such.
        return new Reply( UUID.randomUUID().toString() );
    }

    // `Runnable` interface
    @Override
    public void run ( )
    {
        System.out.println( "`run` method at " + Instant.now() + " for answer: " + this.answer );
        Reply reply = this.getReplyForAnswerCombination( this.answer );
        this.map.put( this.answer , reply );
    }
}

最后,给出了一些代码。我们制作了一个名为 Mapper 包含 main 方法。
我们通过填充一个 Answer 物体。我们创建一个空的 ConcurrentMap 在其中收集结果。我们分配每个 Answer 对象到一个新线程,在该线程中我们调用一个新的 Reply 对象并存储 Answer / Reply 作为Map中的条目配对。

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Mapper
{
    public static void main ( String[] args )
    {
        System.out.println("Runtime.version(): " + Runtime.version() );
        System.out.println("availableProcessors: " + Runtime.getRuntime().availableProcessors());
        System.out.println("maxMemory: " + Runtime.getRuntime().maxMemory() + " | maxMemory/(1024*1024) -> megs: " +Runtime.getRuntime().maxMemory()/(1024*1024)  );
        Mapper app = new Mapper();
        app.demo();
    }

    private void demo ( )
    {
        // Simulate our inputs, a list of `Answer` objects.
        int limit = 10_000;
        List < Answer > answers = new ArrayList <>( limit );
        for ( int i = 0 ; i < limit ; i++ )
        {
            answers.add( new Answer( String.valueOf( i ) ) );
        }

        // Do the work.
        Instant start = Instant.now();
        System.out.println( "Starting work at: " + start + " on count of tasks: " + limit );
        ConcurrentMap < Answer, Reply > results = new ConcurrentHashMap <>();
        try
                (
                        ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
                        // Executors.newFixedThreadPool( 5 )
                        // Executors.newFixedThreadPool( 10 )
                        // Executors.newFixedThreadPool( 1_000 )
                        // Executors.newVirtualThreadExecutor()
                )
        {
            for ( Answer answer : answers )
            {
                ReplierTask task = new ReplierTask( answer , results );
                executorService.submit( task );
            }
        }
        // At this point the flow-of-control blocks until all submitted tasks are done.
        // The executor service is automatically closed by this point as well.
        Duration elapsed = Duration.between( start , Instant.now() );
        System.out.println( "results.size() = " + results.size() + ". Elapsed: " + elapsed );
    }
}

我们可以换个房间 Executors.newVirtualThreadExecutor() 使用平台线程池,与虚拟线程进行比较。让我们在mac mini intel上尝试5、10和1000个平台线程池,macos mojave支持6个真正的内核,没有超线程,32 Gig内存,openjdk特殊构建版本16 loom+9-316分配的最大内存为8 Gig。
10000个任务,每次1秒总运行时间5个平台线程小时-pt33m29.755792s10平台线程小时-pt16m43.318973s1000平台线程10秒-pt10.487689s10000平台线程错误…
无法创建本机线程:可能内存不足或进程/资源限制在3秒内达到虚拟线程数-pt2.645964s

注意事项

警告:项目织机是实验性的,可能会发生变化,尚未用于生产使用。团队现在要求大家给出反馈。
警告:cpu受限的任务(比如视频编码)应该使用平台/内核线程,而不是虚拟线程。执行阻塞操作(如访问文件、记录日志、访问数据库或进行网络调用)的大多数常见代码可能会看到虚拟线程的大量性能提升。
警告:您必须有足够的内存,以便同时运行许多甚至所有任务。如果没有足够的内存可用,则必须采取其他步骤来限制虚拟线程。

相关问题