.net 如何并行执行任务,但每T秒不超过N个任务?

fsi0uk1n  于 2023-11-20  发布在  .NET
关注(0)|答案(4)|浏览(114)

我需要尽可能快地并行运行多个任务。但是如果我的程序每1秒运行超过30个任务,它将被阻塞。如何确保任务在任何1秒间隔内运行不超过30个?
换句话说,如果在最后1秒的时间间隔内完成了30个任务,我们必须阻止新任务启动。
丑陋的解决方案:

private async Task Process(List<Task> taskList, int maxIntervalCount, int timeIntervalSeconds)
{
    var timeList = new List<DateTime>();

    var sem = new Semaphore(maxIntervalCount, maxIntervalCount);
    var tasksToRun = taskList.Select(async task =>
    {
        do
        {
            sem.WaitOne();
        }
        while (HasAllowance(timeList, maxIntervalCount, timeIntervalSeconds));

        await task;

        timeList.Add(DateTime.Now);

        sem.Release();
    });

    await Task.WhenAll(tasksToRun);
}

private bool HasAllowance(List<DateTime> timeList, int maxIntervalCount, int timeIntervalSeconds)
{
    return timeList.Count <= maxIntervalCount 
    || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount];
}

字符串

jljoyd4f

jljoyd4f1#

用户代码永远不应该直接控制任务的调度方式。首先,它不能控制任务的运行方式是TaskScheduler的工作。当用户代码调用.Start()时,它只是将任务添加到线程池队列中执行。await执行已经执行的任务。
TaskServer示例展示了如何创建有限并发的服务器,但同样,还有更好的高级选项。
这个问题的代码不会限制排队的任务,它限制了等待的任务数量。它们都已经在运行了。这类似于在管道中删除 previous 异步操作,只允许有限数量的消息传递到下一级。

延迟阻塞

简单的开箱即用的方法是使用一个具有有限MaxDegreeOftenelism的CockBlock,以确保不超过N个并发操作可以同时运行。如果我们知道每个操作需要多长时间,我们可以添加一点延迟,以确保我们不会超过限制。
在这种情况下,7个并发工作线程每秒执行4个请求,总共每秒最多执行28个请求。BoundedCapacity意味着在downloader.SendAsync块之前,最多只有7个项目将存储在输入缓冲区中。这样,如果操作花费太长时间,我们可以避免淹没ActionBlock

var downloader = new ActionBlock<string>(
        async url => {
            await Task.Delay(250);
            var response=await httpClient.GetStringAsync(url);
            //Do something with it.
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 7, BoundedCapacity=7 }
);

//Start posting to the downloader
foreach(var item in urls)
{
    await downloader.SendAsync(item);
}
downloader.Complete();
await downloader.Completion;

字符串

带SemaphoreSlim的模块

另一种选择是将此合并与由定时器定期重置的SemaphoreSlim结合起来。

var refreshTimer = new Timer(_=>sm.Release(30));

var downloader = new ActionBlock<string>(
        async url => {
            await semaphore.WaitAsync();
            try 
            {
                var response=await httpClient.GetStringAsync(url);
                //Do something with it.
            }
            finally
            {
                semaphore.Release();
            }
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity=5 }
);

//Start the timer right before we start posting 
refreshTimer.Change(1000,1000);
foreach(....)
{

}

jum4pzuy

jum4pzuy2#

这个问题可以通过SemaphoreSlim来解决,SemaphoreSlim被限制为每个时间间隔的最大任务数,当启动任务后时间间隔过去时,Timer被释放。(RateLimiter类)。下面是基于此思想的实现,具有模仿.NET 6 Parallel.ForEachAsync API的签名和行为:

/// <summary>
/// Projects each element of the source sequence into a new form using an
/// asynchronous delegate, enforcing a limit on the number of concurrent
/// asynchronous operations that can start during a specified time span.
/// </summary>
public static Task<TResult[]> Parallel_ForEachAsync<TSource, TResult>(
    IEnumerable<TSource> source,
    Func<TSource, CancellationToken, Task<TResult>> body,
    int maxActionsPerTimeUnit,
    TimeSpan timeUnit,
    CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(body);
    if (maxActionsPerTimeUnit < 1)
        throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
    if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
        throw new ArgumentOutOfRangeException(nameof(timeUnit));

    SemaphoreSlim semaphore = new(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
    CancellationTokenSource cts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);
    List<System.Threading.Timer> timers = new();

    async Task<Task<TResult[]>> EnumerateSourceAsync()
    {
        // This method always completes successfully.
        // Any errors are wrapped in the tasks.
        List<Task<TResult>> tasks = new();
        try
        {
            foreach (TSource item in source)
            {
                try
                {
                    await semaphore.WaitAsync(cts.Token).ConfigureAwait(false);
                }
                catch (OperationCanceledException)
                {
                    if (cancellationToken.IsCancellationRequested)
                        tasks.Add(Task.FromCanceled<TResult>(cancellationToken));
                    // Otherwise the cancellation was caused by a task failure.
                    break;
                }

                // Launch the task
                Task<TResult> task = body(item, cts.Token).ContinueWith(t =>
                {
                    if (!t.IsCompletedSuccessfully) cts.Cancel();
                    // In case of cancellation propagate the correct token.
                    if (t.IsCanceled && cancellationToken.IsCancellationRequested)
                        return Task.FromCanceled<TResult>(cancellationToken);
                    return t;
                }, default, TaskContinuationOptions.DenyChildAttach |
                    TaskContinuationOptions.ExecuteSynchronously,
                    TaskScheduler.Default).Unwrap();
                tasks.Add(task);

                // Schedule the release of the semaphore using a Timer.
                System.Threading.Timer timer = new(_ => semaphore.Release());
                timer.Change(timeUnit, Timeout.InfiniteTimeSpan);
                timers.Add(timer);
            }
        }
        catch (Exception ex) { tasks.Add(Task.FromException<TResult>(ex)); }
        return Task.WhenAll(tasks);
    }

    return EnumerateSourceAsync().ContinueWith(t =>
    {
        // Clean up
        Task.WaitAll(timers.Select(t => t.DisposeAsync().AsTask()).ToArray());
        cts.Dispose();
        semaphore.Dispose();
        return t;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously,
        TaskScheduler.Default).Unwrap().Unwrap();
}

字符串
使用示例:

int[] results = await Parallel_ForEachAsync(Enumerable.Range(1, 100), async n =>
{
    await Task.Delay(500); // Simulate some asynchronous I/O-bound operation
    return n;
}, maxActionsPerTimeUnit: 30, timeUnit: TimeSpan.FromSeconds(1.0);


Parallel_ForEachAsync方法异步传播body委托可能抛出的所有错误,其方式与内置Parallel.ForEachAsync API完全相同。
有关在.NET 6之前的.NET平台上编译的替代实现,并且还具有不同的选项(includeAsynchronousDurationonErrorContinueexecuteOnCapturedContext,但缺少cancellationToken),请参阅此答案的8th revision

yhived7q

yhived7q3#

这是一个片段:

var tasks = new List<Task>();

foreach(item in listNeedInsert)
{
    var task = TaskToRun(item);
    tasks.Add(task);

    if(tasks.Count == 100)
    {
        await Task.WhenAll(tasks);
        tasks.Clear();
    }
}

// Wait for anything left to finish
await Task.WhenAll(tasks);

字符串
请注意,我宁愿将任务添加到List<Task>();中,并在添加完所有任务后,在同一个List<Task>();中等待所有任务
你在这里做什么:

var tasks = taskList.Select(async task =>
    {
        do
        {
            sem.WaitOne();
        }
        while (timeList.Count <= maxIntervalCount 
        || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount]);

        await task;


阻塞,直到任务完成它的工作,因此进行此调用:

Task.WhenAll(tasks).Wait();


此外,这条线Task.WhenAll(tasks).Wait();WhenAll方法执行不必要的阻塞。

0yg35tkg

0yg35tkg4#

阻塞是由于某些服务器/防火墙/硬件限制还是基于观察?

您应该尝试使用BlockingCollection<Task>或类似的thread safe collections,特别是如果您的任务的作业是I/O绑定的。您甚至可以将容量设置为30:

var collection = BlockingCollection<Task>(30);

字符串
然后你可以启动2个C方法:

var population = Task.Factory.Start(Populate);
var processing = Task.Factory.Start(Dequeue);
await Task.WhenAll(population, processing);

Task Populate()
{
    foreach (...)
        collection.Add(...);
    collection.CompleteAdding();
}
Task Dequeue
{
    while(!collection.IsComplete)
        await collection.Take();                            //consider using TryTake()
}

如果限制由于某些真正的限制而存在(应该是非常罕见的),请按如下方式更改Populate():

var stopper = Stopwatch.StartNew();
for (var i = ....)                                          //instead of foreach
{
    if (i % 30 == 0)
    {
        if (stopper.ElapsedMilliseconds < 1000)
            Task.Delay(1000 - stopper.ElapsedMilliseconds); //note that this race condition should be avoided in your code
        stopper.Restart();
    }
    collection.Add(...);
}
collection.CompleteAdding();

相关问题