.net 在作业计划程序服务上使用Task.Run不合适

gopyfrb3  于 2023-05-02  发布在  .NET
关注(0)|答案(2)|浏览(295)

我有一个作业调度程序(windows服务),每隔几分钟运行一次,并试图为长时间运行的任务做异步作业。而且我确实看到在运行多个作业时CPU使用率很高。我需要避免使用任务。以不同的方式接近它?

public void Run()
    {
        var queueManager = new QueueManager();
        foreach (var taskId in queueManager.GetTaskIdsToProcess())
        {
            Task.Run(async () => InvokeProcess(taskId));
        }            
    }

    private void InvokeProcess(long taskId)
    {
        try
        {
            //Long Running process code
        }
        catch (Exception ex)
        {
            //update queue failed
        }
    }
u2nhd7ah

u2nhd7ah1#

我是否需要避免使用Task.Run并以不同的方式处理它?
Task.Run很好。CPU峰值的原因是您分配给Task.Run的工作是CPU受限的,至少部分是这样。使用Task.Run本身的开销很小。你可以每秒启动一百万个Task.Run,而CPU甚至不会注意到(假设每个Task.Run的工作为零)。如果你想降低CPU的使用率,你基本上有两个选择:
1.减少在任何给定时刻并发运行的任务的数量。
1.优化每个任务执行的工作。
两种选择都不是微不足道的。优化调度或算法的程度取决于您对任务并行库的熟悉程度和专业知识,以及。NET API和数据结构。

fykwrbwg

fykwrbwg2#

我认为这不是Task.Run本身,但要么InvokeProcess是CPU密集型的,要么你的代码会导致线程池上同时调度大量的任务,而线程池又会调度多达ThreadPool.GetAvailableThreads(out var workerThreads, out var _)个线程,因此CPU负载很高,或者两者兼而有之。
通常的方法是有某种并发队列和有限数量的Task/Thread处理它。例如,一个简单的可以看起来像:

class Processor
{
    private readonly CancellationToken _stoppingToken;
    private DefaultBackgroundTaskQueue Queue = new(1000);
    private Task[] _Processors;

    public Processor(CancellationToken stoppingToken)
    {
        _stoppingToken = stoppingToken;
        var instances = 8;
        _Processors = Enumerable.Range(0, instances)
            .Select(_ => Task.Run(async () =>
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    var taskId = Queue.DequeueAsync(stoppingToken);
                    try
                    {
                        // process taskId - InvokeProcess(taskId)
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e); // log
                    }

                }
            }))
            .ToArray();
    }
    public async Task Run()
    {
        var queueManager = new QueueManager();
        foreach (var taskId in queueManager.GetTaskIdsToProcess())
        {
            await Queue.QueueBackgroundWorkItemAsync(taskId);
        }            
    }

    private void InvokeProcess(long taskId)
    {
        try
        {
            //Long Running process code
        }
        catch (Exception ex)
        {
            //update queue failed
        }
    }
}

public sealed class DefaultBackgroundTaskQueue 
{
    private readonly Channel<long> _queue;

    public DefaultBackgroundTaskQueue(int capacity)
    {
        BoundedChannelOptions options = new(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<long>(options);
    }

    public async ValueTask QueueBackgroundWorkItemAsync(long workItem)
    {
        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<long> DequeueAsync(CancellationToken cancellationToken)
    {
        var workItem = await _queue.Reader.ReadAsync(cancellationToken);

        return workItem;
    }
}

还可以查看:

相关问题