我需要尽可能快地并行运行多个任务。但是如果我的程序每1秒运行超过30个任务,它将被阻塞。如何确保任务在任何1秒间隔内运行不超过30个?
换句话说,如果在最后1秒的时间间隔内完成了30个任务,我们必须阻止新任务启动。
丑陋的解决方案:
private async Task Process(List<Task> taskList, int maxIntervalCount, int timeIntervalSeconds)
{
var timeList = new List<DateTime>();
var sem = new Semaphore(maxIntervalCount, maxIntervalCount);
var tasksToRun = taskList.Select(async task =>
{
do
{
sem.WaitOne();
}
while (HasAllowance(timeList, maxIntervalCount, timeIntervalSeconds));
await task;
timeList.Add(DateTime.Now);
sem.Release();
});
await Task.WhenAll(tasksToRun);
}
private bool HasAllowance(List<DateTime> timeList, int maxIntervalCount, int timeIntervalSeconds)
{
return timeList.Count <= maxIntervalCount
|| DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount];
}
字符串
4条答案
按热度按时间jljoyd4f1#
用户代码永远不应该直接控制任务的调度方式。首先,它不能控制任务的运行方式是TaskScheduler的工作。当用户代码调用
.Start()
时,它只是将任务添加到线程池队列中执行。await
执行已经执行的任务。TaskServer示例展示了如何创建有限并发的服务器,但同样,还有更好的高级选项。
这个问题的代码不会限制排队的任务,它限制了等待的任务数量。它们都已经在运行了。这类似于在管道中删除 previous 异步操作,只允许有限数量的消息传递到下一级。
延迟阻塞
简单的开箱即用的方法是使用一个具有有限MaxDegreeOftenelism的CockBlock,以确保不超过N个并发操作可以同时运行。如果我们知道每个操作需要多长时间,我们可以添加一点延迟,以确保我们不会超过限制。
在这种情况下,7个并发工作线程每秒执行4个请求,总共每秒最多执行28个请求。
BoundedCapacity
意味着在downloader.SendAsync
块之前,最多只有7个项目将存储在输入缓冲区中。这样,如果操作花费太长时间,我们可以避免淹没ActionBlock
。字符串
带SemaphoreSlim的模块
另一种选择是将此合并与由定时器定期重置的
SemaphoreSlim
结合起来。型
jum4pzuy2#
这个问题可以通过
SemaphoreSlim
来解决,SemaphoreSlim
被限制为每个时间间隔的最大任务数,当启动任务后时间间隔过去时,Timer
被释放。(RateLimiter
类)。下面是基于此思想的实现,具有模仿.NET 6Parallel.ForEachAsync
API的签名和行为:字符串
使用示例:
型
Parallel_ForEachAsync
方法异步传播body
委托可能抛出的所有错误,其方式与内置Parallel.ForEachAsync
API完全相同。有关在.NET 6之前的.NET平台上编译的替代实现,并且还具有不同的选项(
includeAsynchronousDuration
,onErrorContinue
和executeOnCapturedContext
,但缺少cancellationToken
),请参阅此答案的8th revision。yhived7q3#
这是一个片段:
字符串
请注意,我宁愿将任务添加到
List<Task>();
中,并在添加完所有任务后,在同一个List<Task>();
中等待所有任务你在这里做什么:
型
阻塞,直到任务完成它的工作,因此进行此调用:
型
此外,这条线
Task.WhenAll(tasks).Wait();
对WhenAll
方法执行不必要的阻塞。0yg35tkg4#
阻塞是由于某些服务器/防火墙/硬件限制还是基于观察?
您应该尝试使用
BlockingCollection<Task>
或类似的thread safe collections,特别是如果您的任务的作业是I/O绑定的。您甚至可以将容量设置为30:字符串
然后你可以启动2个C方法:
型
如果限制由于某些真正的限制而存在(应该是非常罕见的),请按如下方式更改Populate():
型