当发布的对象不共享时,无法接收rebus rabbitMQ消息

eimct9ow  于 2023-11-19  发布在  RabbitMQ
关注(0)|答案(2)|浏览(180)

当试图解决我的问题,不接收消息通过RabbitMQ使用Rebus,我尝试玩这个例子:https://github.com/rebus-org/RebusSamples/tree/master/PubSubNative并发现从发布者(pub)发布到订阅者(subs)的对象,必须是完全相同的共享对象“StringMessage”对于subs和pub。当我在每个项目中创建相同名称和属性的类时,它不起作用。我喜欢它没有共享文件,但有自己的类-如果可能的话

zlhcx6iw

zlhcx6iw1#

是的,这是可能的,但是你必须绕过接收端的序列化,以某种方式解析接收方的StringMessage定义,即使是发送方的StringMessage被序列化了。
你可能想看看the SharedNothing sample,它演示了这个特殊的东西。
主要部分是CustomMessageDeserializer,它使用这个字典将简单的、程序集限定的类型名Map到所需的类型。

798qvoo8

798qvoo82#

我可以只使用RabbitMQ Client发送消息到rebus,而不需要rebus或来自发送方的共享消息类型。然而,在接收端,我定义了所有的类。这通常是一种方式。
我在.net 4.6项目中使用了这个窗口窗体应用程序,它只需要发送消息。
我有这个帮手:

using System;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;

namespace CMIS.Helpers
{
    public class RebusClient
    {

        public static void SendMessage<T>(string exchangeName, string queueName, string routingKey, Dictionary<string, object> headers, T message)
        {

            string[] qusvr = Context.Variables["QUSVR"].ToString().Split(':');
            string hostName = qusvr[0];
            string userName = qusvr[1];
            string password = qusvr[2];

            ConnectionFactory factory = new ConnectionFactory
            {
                HostName = hostName,
                UserName = userName,
                Password = password
            };

            using (IConnection connection = factory.CreateConnection("CMIS"))
            {
                using (IModel model = connection.CreateModel())
                {
                    model.ExchangeDeclare(exchangeName, "direct", true, false, null);

                    model.QueueDeclare(queueName, true, false, false, null);

                    model.QueueBind(queueName, exchangeName, routingKey, null);

                    var messageBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                    IBasicProperties props = model.CreateBasicProperties();
                    props.Persistent = true;
                    props.ContentType = "text/plain";
                    props.DeliveryMode = 2;
                    props.Headers = headers;
                    model.BasicPublish(exchangeName, routingKey, props, messageBytes);
                }
            }
        }
    }
}

字符串
然后我可以这样发送信息

string facilityCode = claimDetails["FacilityCode"].ToString();
            int claimYear = claimDetails["ClaimYear"].ConvertTo<int>();
            int claimMonth = claimDetails["ClaimMonth"].ConvertTo<int>();
            string requestNo = string.Format("Queue-{0}-{1}{2}", facilityCode, claimYear, claimMonth < 10 ? $"0{claimMonth}" : claimMonth.ToString());
            JObject message = new JObject(
                                            new JProperty("$type", MessagingConstants.QueueUncapturedFolios),
                                            new JProperty("RequestNo", requestNo),
                                            new JProperty("FacilityCode", facilityCode),
                                            new JProperty("ClaimYear", claimYear),
                                            new JProperty("ClaimMonth", claimMonth)
                                         );

            Dictionary<string, object> headers = new Dictionary<string, object>
                                {
                                    { MessagingConstants.MessageTypeHeader, MessagingConstants.QueueUncapturedFolios },
                                    { MessagingConstants.ContentTypeHeader, MessagingConstants.ContentType }
                                };

            RebusClient.SendMessage("RebusDirect", "online_claims_processing", "online_claims_processing", headers, message);


这里是我们使用的常数

public static class MessagingConstants
{
    public const string QueueDocuments = "NHIF.Shared.Messages.QueueDocuments,NHIF.Shared.Messages";
    public const string QueueUncapturedFolios = "NHIF.Shared.Messages.QueueUncapturedFolios,NHIF.Shared.Messages";
    public const string ProcessSkippedFolios = "NHIF.Shared.Messages.ProcessSkippedFolios,NHIF.Shared.Messages";
    public const string CloseSubmissionWindow = "NHIF.Shared.Messages.CloseSubmissionWindow,NHIF.Shared.Messages";
    public const string ContentType = "application/json;charset=utf-8";
    public const string MessageTypeHeader = "rbs2-msg-type";
    public const string ContentTypeHeader = "rbs2-content-type";
}


虽然我可能不会解决你的问题,这种方式可以帮助我避免共享类(消息类型),那里没有必要。这样,消息就序列化了所有的头,rebus将处理它们没有任何问题。
雷加兹

相关问题