RabbitMQ RPC是否以异步方式运行?

tag5nh1u  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(2)|浏览(151)

我正在为一个客户做一个项目,他正在使用RabbitMQ和RPC,我不太了解RabbitMQ,我正在努力在互联网上找到一些像样的例子。我需要实现一些异步操作,我会更好地解释自己。
在目前的状态下,我有一个生产者发送一个RPC请求,并等待一个来自消费者的回答,到目前为止一切都很好,一切都工作正常。我的问题是,我不想等待一个回答,我仍然需要一个答案,但我不想等待它在我的生产者。我会在这里张贴我的生产者和消费者的代码。

制作人

using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitProducer
{
    public class RpcClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;

        public RpcClient()
        {
            ConnectionFactory factory = new ConnectionFactory() { HostName = "192.168.68.17" };

            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            channel.ConfirmSelect();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new EventingBasicConsumer(channel);

            props = channel.CreateBasicProperties();
            string correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId;
            props.ReplyTo = replyQueueName;

            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                string response = Encoding.UTF8.GetString(body);
                if (ea.BasicProperties.CorrelationId == correlationId)
                {
                    respQueue.Add(response);
                }
            };
            channel.BasicAcks += (sender, ea) =>
            {

            };
            channel.BasicNacks += (sender, ea) =>
            {

            };
        }

        public string Call(string message)
        {
            var messageBytes = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(
                exchange: "",
                routingKey: "Ciccio",
                basicProperties: props,
                body: messageBytes);

            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);

            return respQueue.Take();
        }

        public void Close()
        {
            connection.Close();
        }
    }
    class Program
    {
        public static void Main()
        {
            RpcClient rpcClient = new RpcClient();
            Random random = new Random();
            int a = random.Next(10, 50);
            Console.WriteLine("Ciccio");
            Console.WriteLine(a.ToString());
            string response = rpcClient.Call(a.ToString());

            Console.WriteLine(" [.] Got '{0}'", response);
            rpcClient.Close();
            Console.ReadLine();
        }
    }
}

消费者

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
namespace RabbitConsumer
{
    class Program
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "192.168.68.17" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {

                channel.QueueDeclare(queue: "Ciccio", durable: false,
                  exclusive: false, autoDelete: false, arguments: null);
                channel.BasicQos(0, 1, false);
                var consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: "Ciccio",
                  autoAck: false, consumer: consumer);
                Console.WriteLine("Ciccio");
                Console.WriteLine(" [x] Awaiting RPC requests");

                consumer.Received += (model, ea) =>
                {
                    string response = null;
                    System.Threading.Thread.Sleep(5000);
                    var body = ea.Body.ToArray();
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine(" [.] fib({0})", message);
                        response = fib(n).ToString();
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                          basicProperties: replyProps, body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag,
                          multiple: false);
                    }
                };

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }

        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }

            return fib(n - 1) + fib(n - 2);
        }

    }
}
kuhbmx9i

kuhbmx9i1#

好吧,显然我最近工作太多了,我忘记了正确使用任务......这就是你如何才能达到我所寻找的。

异步生产商

using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitProducer
{
    public class RpcClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;

        public RpcClient()
        {
            ConnectionFactory factory = new ConnectionFactory() { HostName = "192.168.68.17" };

            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            channel.ConfirmSelect();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new EventingBasicConsumer(channel);

            props = channel.CreateBasicProperties();
            string correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId;
            props.ReplyTo = replyQueueName;

            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                string response = Encoding.UTF8.GetString(body);
                if (ea.BasicProperties.CorrelationId == correlationId)
                {
                    respQueue.Add(response);

                }
            };
            channel.BasicAcks += (sender, ea) =>
            {

            };
            channel.BasicNacks += (sender, ea) =>
            {

            };
        }

        public string Call(string message)
        {
            var messageBytes = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(
                exchange: "",
                routingKey: "Ciccio",
                basicProperties: props,
                body: messageBytes);

            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);
            return respQueue.Take();            
        }

        public void Close()
        {
            connection.Close();
        }
    }
    class Program
    {
        private static void Send(int n)
        {
            Console.WriteLine("SEND " + n);
            Task taskA = new Task(() =>
            {
                RpcClient rpcClient = new RpcClient();
                string resp = rpcClient.Call(n.ToString());

                Console.WriteLine(n + " [.] Got '{0}'", resp);
                rpcClient.Close();
            });
            taskA.Start();
        }

        public static void Main()
        {
            Random random = new Random();
            int a = random.Next(10, 50);
            Send(a);
            int b = random.Next(10, 50);
            Send(b);
            int c = random.Next(10, 50);
            Send(c);
            Console.ReadLine();
        }
    }
}
6jygbczu

6jygbczu2#

完整的async方法是使用TaskCompletionSource。请参阅https://gigi.nullneuron.net/gigilabs/abstracting-rabbitmq-rpc-with-taskcompletionsource/以取得范例。
通过这种方式,逻辑可以抽象到一个中心位置,并在整个应用中使用,就像使用HttpClient.GetAsync()一样。
类似于:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;

namespace RabbitProducer
{
    public class RpcClient : IDisposable
    {
        private bool disposed = false;
        private IConnection connection;
        private IModel channel;
        private EventingBasicConsumer consumer;
        private ConcurrentDictionary<string,
            TaskCompletionSource<string>> pendingMessages;

        private string responseQueueName;
        private const string exchangeName = ""; // default exchange

        public RpcClient()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };

            this.connection = factory.CreateConnection();
            this.channel = connection.CreateModel();

            this.responseQueueName = this.channel.QueueDeclare().QueueName;

            this.consumer = new EventingBasicConsumer(this.channel);
            this.consumer.Received += Consumer_Received;
            this.channel.BasicConsume(responseQueueName, true, consumer);

            this.pendingMessages = new ConcurrentDictionary<string,
                TaskCompletionSource<string>>();
        }

        public Task<string> SendAsync(string message)
        {
            var tcs = new TaskCompletionSource<string>();
            var correlationId = Guid.NewGuid().ToString();

            this.pendingMessages[correlationId] = tcs;

            this.Publish(message, correlationId);

            return tcs.Task;
        }

        private void Publish(string message, string correlationId)
        {
            var props = this.channel.CreateBasicProperties();
            props.CorrelationId = correlationId;
            props.ReplyTo = responseQueueName;

            byte[] messageBytes = Encoding.UTF8.GetBytes(message);
            this.channel.BasicPublish(exchangeName, "Cissio", props, messageBytes);

            Console.WriteLine($"Sent: {message} with CorrelationId {correlationId}");
        }

        private void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var correlationId = e.BasicProperties.CorrelationId;
            var message = Encoding.UTF8.GetString(e.Body.ToArray());

            Console.WriteLine($"Received: {message} with CorrelationId {correlationId}");

            this.pendingMessages.TryRemove(correlationId, out var tcs);
            if (tcs != null)
                tcs.SetResult(message);
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (!disposed && disposing)
            {
                this.channel?.Dispose();
                this.connection?.Dispose();
            }

            this.disposed = true;
        }

        public static async Task Main()
        {
            using var rpcClient = new RpcClient();
            Random random = new Random();

            var n = random.Next(10, 50);
            var response = await rpcClient.SendAsync(n.ToString());
            Console.WriteLine(n + " [.] Got '{0}'", response);

            n = random.Next(10, 50);
            response = await rpcClient.SendAsync(n.ToString());
            Console.WriteLine(n + " [.] Got '{0}'", response);

            n = random.Next(10, 50);
            response = await rpcClient.SendAsync(n.ToString());
            Console.WriteLine(n + " [.] Got '{0}'", response);

            Console.ReadLine();
        }
    }
}

相关问题