从ASP.NET核心中的控制器操作运行后台任务

q1qsirdb  于 2022-11-19  发布在  .NET
关注(0)|答案(4)|浏览(158)

我正在使用C#和ASP.NETCore2.0开发一个带有RESTAPI的Web应用程序。
我想要实现的是,当客户端向端点发送请求时,我将运行一个与客户端请求上下文分离的后台任务,如果任务成功启动,则该任务将结束。
我知道有HostedService,但问题是HostedService在服务器启动时启动,据我所知,没有办法从控制器手动启动HostedService
下面是一个简单的代码来演示这个问题。

[Authorize(AuthenticationSchemes = "UsersScheme")]
public class UsersController : Controller
{
    [HttpPost]
    public async Task<JsonResult> StartJob([FromForm] string UserId, [FromServices] IBackgroundJobService backgroundService)
    {
        // check user account
        (bool isStarted, string data) result = backgroundService.Start();

        return JsonResult(result);
    }
}
pinkon5k

pinkon5k1#

您仍然可以将IHostedServiceBlockingCollection结合使用,作为后台任务的基础。
BlockingCollection创建一个 Package ,这样我们就可以将其作为单例注入。
当集合为空时,BlockingCollection.Take不会占用处理器时间。将取消令牌传递给.Take方法将在令牌取消时正常退出。

public class TasksToRun
{
    private readonly BlockingCollection<TaskSettings> _tasks;

    public TasksToRun() => _tasks = new BlockingCollection<TaskSettings>(new ConcurrentQueue<TaskSettings>());

    public void Enqueue(TaskSettings settings) => _tasks.Add(settings);

    public TaskSettings Dequeue(CancellationToken token) => _tasks.Take(token);
}

对于后台进程,我们可以使用IHostedService-Microsoft.Extensions.Hosting.BackgroundService的“内置”实现。
此服务将使用从“队列”中提取的任务。

public class TaskProcessor : BackgroundService
{
    private readonly TasksToRun _tasks;

    public TaskProcessor(TasksToRun tasks) => _tasks = tasks;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await Task.Yield(); // This will prevent background service from blocking start up of application

        while (cancellationToken.IsCancellationRequested == false)
        {
            try
            {
                var taskToRun = _tasks.Dequeue(_tokenSource.Token);

                await ExecuteTask(taskToRun);               
            }
            catch (OperationCanceledException)
            {
                // execution cancelled
            }
            catch (Exception e)
            {
                // Catch and log all exceptions,
                // So we can continue processing other tasks
            }
        }
    }
}

然后,我们可以从控制器添加新任务,而无需等待它们完成

public class JobController : Controller
{
    private readonly TasksToRun _tasks;

    public JobController(TasksToRun tasks) => _tasks = tasks;

    public IActionResult PostJob()
    {
        var settings = CreateTaskSettings();

        _tasks.Enqueue(settings);

        return Ok();
    }
}

应将阻塞集合的 Package 器注册为单例依赖项注入

services.AddSingleton<TasksToRun, TasksToRun>();

注册后台服务

services.AddHostedService<TaskProcessor>();
gcuhipw9

gcuhipw92#

这在很大程度上是从skjagini's answer中链接的文档中得到的启发,并进行了一些改进。
我认为在这里重申整个示例可能会有所帮助,以防链接在某个点中断。最值得注意的是,我注入了一个IServiceScopeFactory,以允许后台进程自己安全地请求服务。
核心思想是创建一个任务队列,用户可以将其注入到控制器中,然后向其分配任务。同一个任务队列存在于长时间运行的托管服务中,该服务一次将一个任务出列并执行它。
工作队列:

public interface IBackgroundTaskQueue
{
    // Enqueues the given task.
    void EnqueueTask(Func<IServiceScopeFactory, CancellationToken, Task> task);

    // Dequeues and returns one task. This method blocks until a task becomes available.
    Task<Func<IServiceScopeFactory, CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly ConcurrentQueue<Func<IServiceScopeFactory, CancellationToken, Task>> _items = new();

    // Holds the current count of tasks in the queue.
    private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void EnqueueTask(Func<IServiceScopeFactory, CancellationToken, Task> task)
    {
        if(task == null)
            throw new ArgumentNullException(nameof(task));

        _items.Enqueue(task);
        _signal.Release();
    }

    public async Task<Func<IServiceScopeFactory, CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
    {
        // Wait for task to become available
        await _signal.WaitAsync(cancellationToken);

        _items.TryDequeue(out var task);
        return task;
    }
}

在任务队列的核心,我们有一个线程安全的ConcurrentQueue<>。因为我们不想在新任务可用之前轮询队列,所以我们使用SemaphoreSlim对象来跟踪队列中的当前任务数。每次调用Release时,内部计数器都会递增。WaitAsync方法会一直阻塞,直到内部计数器大于0。并且随后将其递减。
为了使任务出队和执行任务,我们创建了一个后台服务:

public class BackgroundQueueHostedService : BackgroundService
{
    private readonly IBackgroundTaskQueue _taskQueue;
    private readonly IServiceScopeFactory _serviceScopeFactory;
    private readonly ILogger<BackgroundQueueHostedService> _logger;

    public BackgroundQueueHostedService(IBackgroundTaskQueue taskQueue, IServiceScopeFactory serviceScopeFactory, ILogger<BackgroundQueueHostedService> logger)
    {
        _taskQueue = taskQueue ?? throw new ArgumentNullException(nameof(taskQueue));
        _serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Dequeue and execute tasks until the application is stopped
        while(!stoppingToken.IsCancellationRequested)
        {
            // Get next task
            // This blocks until a task becomes available
            var task = await _taskQueue.DequeueAsync(stoppingToken);

            try
            {
                // Run task
                await task(_serviceScopeFactory, stoppingToken);
            }
            catch(Exception ex)
            {
                _logger.LogError(ex, "An error occured during execution of a background task");
            }
        }
    }
}

最后,我们需要使任务队列可用于依赖注入,并启动后台服务:

public void ConfigureServices(IServiceCollection services)
{
    // ...
    
    services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
    services.AddHostedService<BackgroundQueueHostedService>();
    
    // ...
}

现在,我们可以将后台任务队列注入控制器并将任务入队:

public class ExampleController : Controller
{
    private readonly IBackgroundTaskQueue _backgroundTaskQueue;

    public ExampleController(IBackgroundTaskQueue backgroundTaskQueue)
    {
        _backgroundTaskQueue = backgroundTaskQueue ?? throw new ArgumentNullException(nameof(backgroundTaskQueue));
    }

    public IActionResult Index()
    {
        _backgroundTaskQueue.EnqueueTask(async (serviceScopeFactory, cancellationToken) =>
        {
            // Get services
            using var scope = serviceScopeFactory.CreateScope();
            var myService = scope.ServiceProvider.GetRequiredService<IMyService>();
            var logger = scope.ServiceProvider.GetRequiredService<ILogger<ExampleController>>();
            
            try
            {
                // Do something expensive
                await myService.DoSomethingAsync(cancellationToken);
            }
            catch(Exception ex)
            {
                logger.LogError(ex, "Could not do something expensive");
            }
        });

        return Ok();
    }
}

为什么要使用IServiceScopeFactory

理论上,我们可以直接使用已经注入到控制器中的服务对象,这对于单例服务以及大多数作用域服务都很有效。
但是,对于实现IDisposable(例如DbContext)的作用域服务,这可能会中断:将任务入队后,控制器方法返回,请求完成。然后框架清理注入的服务。如果我们的后台任务足够慢或延迟,它可能会尝试调用已释放服务的方法,然后会遇到错误。
为了避免这种情况,我们的排队任务应该总是创建它们自己的服务作用域,并且不应该使用来自周围控制器的服务示例。

km0tfn4u

km0tfn4u3#

Microsoft已在https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.1中记录了相同内容
它使用BackgroundTaskQueue完成,BackgroundTaskQueue获取从Controller分配的工作,而该工作由派生自BackgroundService的QueueHostedService执行。

umuewwlo

umuewwlo4#

您可以在ThreadPool中使用另一个线程:
将方法排入队列以供执行。当执行绪集区执行绪变成可用时,就会执行方法。

public class ToDoController : Controller
{
    private readonly IServiceScopeFactory _serviceScopeFactory;
    public ToDoController(IServiceScopeFactory serviceScopeFactory)
    {
        _serviceScopeFactory = serviceScopeFactory;
    }
    public string Index(Func<IToDoDependency,Task> DoHeavyWork)
    {
        ThreadPool.QueueUserWorkItem(delegate {
            // Get services
            using var scope = _serviceScopeFactory.CreateScope();
            var dependency= scope.ServiceProvider.GetRequiredService<IToDoDependency>();
            DoHeavyWork(dependency);

            // OR 
            // Get the heavy work from ServiceProvider
            var heavyWorkSvc= scope.ServiceProvider.GetRequiredService<IHeavyWorkService>();
            heavyWorkSvc.Do(dependency);
        });
        return "Immediate Response";
    }
}

相关问题