.net Parallel.ForEach是否限制活动线程的数量?

vyswwuz2  于 2023-02-10  发布在  .NET
关注(0)|答案(6)|浏览(276)

给定以下代码:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

所有1000个线程几乎同时产生吗?

ohtdti5x

ohtdti5x1#

不,它不会启动1000个线程--是的,它会限制线程的使用数量。并行扩展使用适当数量的内核,它根据你实际拥有的内核数量和已经处于忙碌状态的内核数量来分配工作,然后使用一种叫做“工作窃取”的技术,让每个线程高效地处理自己的队列,而只需要做任何代价高昂的交叉操作。线程在真正需要的时候访问。
看看PFX Team Blog,了解它如何分配工作以及其他各种主题的信息。
请注意,在某些情况下,您也可以指定所需的并行度。

shyt4zoc

shyt4zoc2#

在单核计算机上...并行.ForEach分区它在多个线程之间工作的集合的(块),但是这个数字是基于一个算法计算的,该算法考虑并持续监视它分配给ForEach的线程所做的工作。因此,* 如果ForEach的主体部分调用长时间运行的IO绑定/阻塞函数会让线程等待,算法将产生更多线程,并在它们之间重新划分集合 。如果线程快速完成,并且不会阻塞IO线程(例如,简单地计算一些数字), 算法将增加(或实际上减少)线程的数量到算法认为吞吐量(每次迭代的平均完成时间)最优的点。
基本上,所有各种并行库函数背后的线程池都会计算出要使用的最佳线程数。物理处理器内核的数量只是等式的一部分。内核数量和产生的线程数量之间并不是简单的一对一关系。
我不觉得有关取消和处理同步线程的文档很有帮助。希望微软能在MSDN上提供更好的例子。
不要忘记,主体代码必须编写为在多个线程上运行,沿着所有常见的线程安全考虑,框架还没有抽象出这个因素。

4dc9hkyq

4dc9hkyq3#

问得好。在您的示例中,即使在四核处理器上,并行化水平也相当低,但是经过一些等待,并行化水平可以变得相当高。

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

现在看看添加一个等待操作来模拟HTTP请求时会发生什么。

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

我还没有做任何修改,但是并发/并行的级别已经急剧上升,并发的限制可以随着ParallelOptions.MaxDegreeOfParallelism的增加而增加。

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

我建议设置ParallelOptions.MaxDegreeOfParallelism。它不一定会增加正在使用的线程数量,但它会确保您只启动数量正常的线程,这似乎是您所关心的。
最后回答你的问题,不,你不会让所有的线程同时启动。如果你希望完美地并行调用,比如测试竞态条件,请使用Parallel.Invoke。

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}
u2nhd7ah

u2nhd7ah4#

它根据处理器/内核的数量计算出最佳的线程数量,它们不会一次全部繁殖。

oyt4ldly

oyt4ldly5#

请参阅Does Parallel.For use one Task per iteration?以了解要使用的“心智模型”的想法。然而,作者确实声明“在一天结束时,重要的是要记住,实现细节可能随时更改。”

qeeaahzv

qeeaahzv6#

Parallel.ForEach是否限制活动线程的数量?
如果不使用正MaxDegreeOfParallelism配置Parallel.ForEach,则答案为否。使用默认配置,并假设source序列具有足够大的大小,Parallel.ForEach将使用ThreadPool中立即可用的所有线程。并且会不断地请求更多。Parallel.ForEach本身对线程的数量没有限制。它只受相关联的TaskScheduler的能力的限制。

因此,如果您想了解未配置Parallel.ForEach的行为,您必须了解ThreadPool的行为。这非常简单,可以在一个段落中描述。ThreadPool通过启动新线程立即满足所有工作请求,最高可达软限制(默认为Environment.ProcessorCount)。达到此限制后,进一步的请求被排队,并且新线程以每秒一个新线程的速率被创建以满足需求¹。对于线程的数量也有硬限制,在我的机器中是32,767个线程。软限制可以用ThreadPool.SetMinThreads方法来配置。ThreadPool还使线程退役,在有太多的工作并且没有排队的工作的情况下,以大约相同的速率(每秒1个)。
下面是Parallel.ForEach使用ThreadPool中所有可用线程的实验演示。ThreadPool.SetMinThreads预先配置了可用线程的数量,然后Parallel.ForEach启动并使用所有线程:

ThreadPool.SetMinThreads(workerThreads: 100, completionPortThreads: 10);
HashSet<Thread> threads = new();
int concurrency = 0;
int maxConcurrency = 0;
Parallel.ForEach(Enumerable.Range(1, 1500), n =>
{
    lock (threads) maxConcurrency = Math.Max(maxConcurrency, ++concurrency);
    lock (threads) threads.Add(Thread.CurrentThread);
    // Simulate a CPU-bound operation that takes 200 msec
    Stopwatch stopwatch = Stopwatch.StartNew();
    while (stopwatch.ElapsedMilliseconds < 200) { }
    lock (threads) --concurrency;
});
Console.WriteLine($"Number of unique threads: {threads.Count}");
Console.WriteLine($"Maximum concurrency: {maxConcurrency}");

输出(等待约5秒后):

Number of unique threads: 102
Maximum concurrency: 102

Online demo.
completionPortThreads的数量与本测试无关。Parallel.ForEach使用指定为"workerThreads"的线程。102个线程为:

  • 按需立即创建的100个ThreadPool线程。
  • 延迟1秒后又注入了一个ThreadPool线程。
  • 控制台应用程序的主线程。

¹ ThreadPool的注入速率没有文档记录。从. NET 7开始,它是每秒一个线程,但在未来的. NET版本中可能会改变。

相关问题