如何从单例服务调用方法以在整个应用程序生命周期中运行

fdx2calv  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(350)

我在netcore中实现了一个kafka事件总线作为单例服务。服务本身在startup.cs中配置了autofac。该服务具有 Listen() 方法:

public void Listen()
{
    using(var consumer = new Consumer<Null, string>(_config, null, new StringDeserializer(Encoding.UTF8)))
    {
        consumer.Subscribe(new string[] { "business-write-topic" });

        consumer.OnMessage += (_, msg) =>
        {
            Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
            consumer.CommitAsync(msg);
        };

        while (true)
        {
            consumer.Poll(100);
        }
    }
}

我的理解是,为了让这个方法在应用程序的生命周期中不断地侦听消息,我必须从web主机调用program.cs,方法是以某种方式获取与主机关联的serviceprovider,然后检索服务示例,并调用该方法。
我已将默认net core 2.1模板中的program.cs配置为:

public class Program
{
    public static void Main(string[] args)
    {
        var host = CreateWebHost(args);
        host.Run();
    }

    public static IWebHost CreateWebHost(string[] args) =>
        WebHost.CreateDefaultBuilder(args)
            .UseStartup<Startup>()
            .Build();
}

除了有可用的主机,所以我可以以某种方式访问服务,我不知道从这里去哪里。我搜索过类似的问题,并在官方文件中四处阅读,但我似乎不知道如何访问该服务,以便我可以拨打电话 Listen() 方法。
这是实现我目标的“前进”方式吗?如果是这样,我该如何进行?如果不是——也就是说——如果这类任务通常是以另一种方式完成的,我该怎么做呢?

flmtquvp

flmtquvp1#

编辑:
下面的答案仍然完全正确。有一个基类叫做 BackgroundService 由microsoft提供,可用于只需要实现 ExecuteAsync(CancellationToken stoppingToken) 而不是 IHostedService . 你可以在这里找到它。为此,您需要安装软件包 Microsoft.Extensions.Hosting .
以前也是有效的答案:我建议使用ihostedservice。ihostedservice实现注册为单例,它们一直运行,直到服务器关闭。
创建此基类

public abstract class HostedService : IHostedService
{
    private Task executingTask;
    private CancellationTokenSource cancellationTokenSource;

    public Task StartAsync(CancellationToken cancellationToken)
    {
        this.cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        this.executingTask = this.ExecuteAsync(this.cancellationTokenSource.Token);

        return this.executingTask.IsCompleted ? this.executingTask : Task.CompletedTask;
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        if (this.executingTask == null)
        {
            return;
        }

        this.cancellationTokenSource.Cancel();

        await Task.WhenAny(this.executingTask, Task.Delay(-1, cancellationToken));
    }

    protected abstract Task ExecuteAsync(CancellationToken cancellationToken);
}

然后创建使用者主机

public class ConsumerHost : HostedService
{
    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        using (var consumer = new Consumer<Null, string>(_config, null, new StringDeserializer(Encoding.UTF8)))
        {
            consumer.Subscribe(new string[] {"business-write-topic"});

            consumer.OnMessage += (_, msg) =>
            {
                Console.WriteLine(
                    $"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
                consumer.CommitAsync(msg);
            };

            while (!cancellationToken.IsCancellationRequested) // will make sure to stop if the application is being shut down!
            {
                consumer.Poll(100);
                await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
            }
        }
    }
}

现在在configureservice方法的startup类中添加singleton

public void ConfigureServices(IServiceCollection services)
{
   services.AddSingleton<IHostedService, ConsumerHost>();
}

这个服务现在将在webhost完成构建时启动,并在您关闭服务器时停止。无需手动触发,让网络主机为您执行。

pkwftd7m

pkwftd7m2#

我想你需要后台服务。

public class ListnerBackgroundService : BackgroundService
{
    private readonly ListnerService service;

    public ListnerBackgroundService(ListnerService service)
    {
        this.service = service;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        service.Listen();

        return Task.CompletedTask;
    }
}

并注册:

public void ConfigureServices(IServiceCollection services)
{
   ...
   services.AddSingleton<IHostedService, ListnerBackgroundService>();
   ...
}

相关问题