未触发RabbitMQ侦听器

yqhsw0fo  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(153)

我正在处理一个asp.netcore 6.0项目。
我正在使用RabbitMQ来实现cart。即:付款成功后,应进行预订。
首先我创建队列:

var factory = new ConnectionFactory
                    {
                        Uri = new Uri(_config.GetValue<string>("AmpqUrl")),
                    };

                    try
                    {
                        using var connection = factory.CreateConnection();
                        using var channel = connection.CreateModel();

                        channel.QueueDeclare(queue: "confirmed_payments", durable: true, exclusive: false, autoDelete: false, arguments: null);

                        var data = new
                        {
                            transactionId,
                            paymentConfirmedAt = DateTime.UtcNow,
                        };

                        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));

                        channel.BasicPublish(exchange: "", routingKey: "confirmed_payments", basicProperties: null, body: body);

                    }
                    catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException ex)
                    {
                               Console.WriteLine("ex.ToString()");

                    }

和侦听器(另一个项目):

public Task ListenPaymentConfimations(CancellationToken cancellationToken)
    {
        var factory = new ConnectionFactory
        {
            Uri = new Uri(_configuration.GetValue<string>("AmpqUrl")),
        };

        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        var confimedPaymentsConsumer = new EventingBasicConsumer(_channel);

        confimedPaymentsConsumer.Received += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var response = Encoding.UTF8.GetString(body);

            var data = JsonConvert.DeserializeAnonymousType(response,
                new { transactionId = "", paymentConfirmedAt = "" }
            );

            var date = DateTime.Parse(data.paymentConfirmedAt).ToUniversalTime();

            using var scope = _serviceProvider.CreateScope();

            var dbService =
                scope.ServiceProvider.GetRequiredService<ITechneDbService>();

            var isPaymentConfimed = await dbService.UpdateCartPaymentConfirmedAt(data.transactionId, date);

            _logger.LogInformation("Transaction - {0}", data.transactionId);
            _logger.LogInformation("Transaction - {0}", date);
            _logger.LogInformation("Payment Confirmed - {0}", isPaymentConfimed);

            if (isPaymentConfimed)
            {
               // handle booking
            }
        };

        _channel.BasicConsume(queue: "confirmed_payments",
                     autoAck: true,
                     consumer: confimedPaymentsConsumer);

        return Task.CompletedTask;
    }

当我尝试预订酒店时,有时会命中Listener,我可以成功预订。但有时不会命中listener(创建队列时没有抛出异常)。

我不知道为什么会这样。
任何人都可以找到问题,请帮助我。

rjjhvcjd

rjjhvcjd1#

更改连接的构造函数,以指示应异步调度使用者。

var factory = new ConnectionFactory
{
    Uri = new Uri(_configuration.GetValue<string>("AmpqUrl")),
    DispatchConsumersAsync = true // <---- this
};

您可能还想改用AsyncEventingBasicConsumer而不是EventingBasicConsumer

相关问题