在ASP.NET核心应用程序中设置RabbitMQ使用者

i2loujxw  于 2022-12-26  发布在  RabbitMQ
关注(0)|答案(3)|浏览(152)

我有一个ASP.NET核心应用程序,我想在那里使用RabbitMQ消息。
我已经成功地在命令行应用程序中设置了发布者和消费者,但我不确定如何在Web应用程序中正确地设置它们。
我想在Startup.cs中初始化它,但当然,一旦启动完成,它就死了。
如何从Web应用程序以正确的方式初始化消费者?

z3yyvxxp

z3yyvxxp1#

对使用者/监听程序使用Singleton模式,以便在应用程序运行时保留它。使用IApplicationLifetime接口在应用程序启动/停止时启动/停止使用者。

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<RabbitListener>();
    }

    public void Configure(IApplicationBuilder app)
    {
        app.UseRabbitListener();
    }
}

public static class ApplicationBuilderExtentions
{
    //the simplest way to store a single long-living object, just for example.
    private static RabbitListener _listener { get; set; }

    public static IApplicationBuilder UseRabbitListener(this IApplicationBuilder app)
    {
        _listener = app.ApplicationServices.GetService<RabbitListener>();

        var lifetime = app.ApplicationServices.GetService<IApplicationLifetime>();

        lifetime.ApplicationStarted.Register(OnStarted);

        //press Ctrl+C to reproduce if your app runs in Kestrel as a console app
        lifetime.ApplicationStopping.Register(OnStopping);

        return app;
    }

    private static void OnStarted()
    {
        _listener.Register();
    }

    private static void OnStopping()
    {
        _listener.Deregister();    
    }
}
  • 你应该注意你的应用的托管位置。例如,IIS可以回收并停止你的代码运行。
  • 这种模式可以扩展到侦听器池。
hgc7kmma

hgc7kmma2#

这是我的听众:

public class RabbitListener
{
    ConnectionFactory factory { get; set; }
    IConnection connection { get; set; }
    IModel channel { get; set; }

    public void Register()
    {
        channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            int m = 0;
        };
        channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
    }

    public void Deregister()
    {
        this.connection.Close();
    }

    public RabbitListener()
    {
        this.factory = new ConnectionFactory() { HostName = "localhost" };
        this.connection = factory.CreateConnection();
        this.channel = connection.CreateModel();

    }
}
6bc51xsx

6bc51xsx3#

另一个选项是Hosted Services
您可以创建一个HostedService并调用一个方法来注册RabbitMq监听程序。

public interface IConsumerService
{
    Task ReadMessgaes();
}

public class ConsumerService : IConsumerService, IDisposable
{
    private readonly IModel _model;
    private readonly IConnection _connection;
    public ConsumerService(IRabbitMqService rabbitMqService)
    {
        _connection = rabbitMqService.CreateChannel();
        _model = _connection.CreateModel();
        _model.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false);
        _model.ExchangeDeclare("your.exchange.name", ExchangeType.Fanout, durable: true, autoDelete: false);
        _model.QueueBind(_queueName, "your.exchange.name", string.Empty);
    }
    const string _queueName = "your.queue.name";
    public async Task ReadMessgaes()
    {
        var consumer = new AsyncEventingBasicConsumer(_model);
        consumer.Received += async (ch, ea) =>
        {
            var body = ea.Body.ToArray();
            var text = System.Text.Encoding.UTF8.GetString(body);
            Console.WriteLine(text);
            await Task.CompletedTask;
            _model.BasicAck(ea.DeliveryTag, false);
        };
        _model.BasicConsume(_queueName, false, consumer);
        await Task.CompletedTask;
    }

    public void Dispose()
    {
        if (_model.IsOpen)
            _model.Close();
        if (_connection.IsOpen)
            _connection.Close();
    }
}
    • RabbitMQ服务**:
public interface IRabbitMqService
{
    IConnection CreateChannel();
}

public class RabbitMqService : IRabbitMqService
{
    private readonly RabbitMqConfiguration _configuration;
    public RabbitMqService(IOptions<RabbitMqConfiguration> options)
    {
        _configuration = options.Value;
    }
    public IConnection CreateChannel()
    {
        ConnectionFactory connection = new ConnectionFactory()
        {
            UserName = _configuration.Username,
            Password = _configuration.Password,
            HostName = _configuration.HostName
        };
        connection.DispatchConsumersAsync = true;
        var channel = connection.CreateConnection();
        return channel;
    }
}

最后创建一个HostedService,调用ReadMessages方法注册:

public class ConsumerHostedService : BackgroundService
{
    private readonly IConsumerService _consumerService;

    public ConsumerHostedService(IConsumerService consumerService)
    {
        _consumerService = consumerService;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _consumerService.ReadMessgaes();
    }
}

注册服务:

services.AddSingleton<IRabbitMqService, RabbitMqService>();
services.AddSingleton<IConsumerService, ConsumerService>();
services.AddHostedService<ConsumerHostedService>();

在这种情况下,当应用程序停止,您的消费者将自动停止。
补充信息:

    • 应用程序设置. json**:
{
  "RabbitMqConfiguration": {
    "HostName": "localhost",
    "Username": "guest",
    "Password": "guest"
  }
}
    • RabbitMQ配置**
public class RabbitMqConfiguration
{
    public string HostName { get; set; }
    public string Username { get; set; }
    public string Password { get; set; }
}

相关问题