我正在处理一个asp.netcore 6.0项目。
我正在使用RabbitMQ来实现cart。即:付款成功后,应进行预订。
首先我创建队列:
var factory = new ConnectionFactory
{
Uri = new Uri(_config.GetValue<string>("AmpqUrl")),
};
try
{
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "confirmed_payments", durable: true, exclusive: false, autoDelete: false, arguments: null);
var data = new
{
transactionId,
paymentConfirmedAt = DateTime.UtcNow,
};
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));
channel.BasicPublish(exchange: "", routingKey: "confirmed_payments", basicProperties: null, body: body);
}
catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException ex)
{
Console.WriteLine("ex.ToString()");
}
和侦听器(另一个项目):
public Task ListenPaymentConfimations(CancellationToken cancellationToken)
{
var factory = new ConnectionFactory
{
Uri = new Uri(_configuration.GetValue<string>("AmpqUrl")),
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
var confimedPaymentsConsumer = new EventingBasicConsumer(_channel);
confimedPaymentsConsumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
var data = JsonConvert.DeserializeAnonymousType(response,
new { transactionId = "", paymentConfirmedAt = "" }
);
var date = DateTime.Parse(data.paymentConfirmedAt).ToUniversalTime();
using var scope = _serviceProvider.CreateScope();
var dbService =
scope.ServiceProvider.GetRequiredService<ITechneDbService>();
var isPaymentConfimed = await dbService.UpdateCartPaymentConfirmedAt(data.transactionId, date);
_logger.LogInformation("Transaction - {0}", data.transactionId);
_logger.LogInformation("Transaction - {0}", date);
_logger.LogInformation("Payment Confirmed - {0}", isPaymentConfimed);
if (isPaymentConfimed)
{
// handle booking
}
};
_channel.BasicConsume(queue: "confirmed_payments",
autoAck: true,
consumer: confimedPaymentsConsumer);
return Task.CompletedTask;
}
当我尝试预订酒店时,有时会命中Listener,我可以成功预订。但有时不会命中listener(创建队列时没有抛出异常)。
我不知道为什么会这样。
任何人都可以找到问题,请帮助我。
1条答案
按热度按时间rjjhvcjd1#
更改连接的构造函数,以指示应异步调度使用者。
您可能还想改用
AsyncEventingBasicConsumer
而不是EventingBasicConsumer
。