rabbitmq 消息传递系统的应用程序集成场景?

2jcobegt  于 2023-04-20  发布在  RabbitMQ
关注(0)|答案(2)|浏览(169)

我有一个场景,其中有一个后端系统和一个前端系统,中间有一个基于.NET的中间件。后端系统提供了一个REST API,供前端系统使用。
通信总是从POST请求开始。这在后端系统上创建了一个会话。所有后续请求都包含SessionId。许多客户端交互并在此系统上获得自己的会话。后端系统可以并行处理不同的会话,例如通过HTTP GET API返回信息。
但是,有一个限制:每个会话,读和写请求只能由后端系统顺序处理。下面的屏幕截图说明了这一点:x1c 0d1x这个限制使前端复杂化,因为它实际上想向后端系统发送并行请求。为了保持前端简单,我想通过消息传递系统解耦.net中间件层和后端。
有一些众所周知的模式,比如竞争消费者模式等等。不幸的是,我还没有找到适合我的场景的标准模式。有人知道创建这种模式的合适方法吗?如果可能的话,RabbitMQ或ActiveMQ/Artemis是可能的。
该方案必须在本地运行,不能依赖于特定的云技术。此外,解决方案不应降低整体系统性能。
在我的理解中,我需要为每个SessionId(从后端)动态创建一个消息队列。该会话上的所有请求都需要存储在这个特定的队列中,并按顺序处理。因此,后端上的队列数量将与会话数量一样多。下图说明了两个会话A和B的请求和响应队列的基本思想:

有没有人知道如何解决这个问题?提前感谢!
许多问候

2exbekwf

2exbekwf1#

一个简单的解决方案是在发送进一步的消息之前等待响应,但这可能会降低性能。
另一种选择是将多个消息打包成一个更大的消息。这个message aggregation可以保证顺序。它可以在客户端或服务器端完成。
"exactly once in order" policy (EOIO)会带来性能损失。在每条消息中写入严格递增的跟踪ID可能会有所帮助。这将允许服务器检测无序消息。这取决于整体要求,服务器应该等待延迟消息的时间。

7uzetpgm

7uzetpgm2#

如何使用信号量来确保每个会话一次只执行一次。

public class MyClass
{
    private readonly SemaphoreService semaphoreService;

    public MyClass(SemaphoreService semaphoreService)
    {
        this.semaphoreService = semaphoreService;
    }

    public async Task Write(string sessionId)
    {
        await semaphoreService.Execute(async () =>
        {
            await WriteToDataBase();
            await DoSomeOtherStuff();
        }, sessionId);
    }

    public async Task Read(string sessionId)
    {
        await semaphoreService.Execute(async () =>
        {
            //Do stuff
        }, sessionId);
    }

    private async Task WriteToDataBase()
    {
        //Do stuff
    }

    private async Task DoSomeOtherStuff()
    {
        //Do stuff
    }
}

public class SemaphoreService
{
    private static readonly ConcurrentDictionary<string, SemaphoreSlim> Semaphores = new();

    public void Delete(string sessionId)
    {
        Semaphores.Remove(sessionId, out var _);
    }

    public SemaphoreSlim GetSemaphore(string sessionId)
    {
        return Semaphores.GetOrAdd(sessionId, _ => new SemaphoreSlim(1));
    }

    public async Task Execute(Func<Task> action, string sessionId)
    {
        var sem = GetSemaphore(sessionId);
        await sem.WaitAsync();
        try
        {
            await action();
        }
        finally
        {
            sem.Release();
        }
    }
}

您可能需要挂接SemaphoreService.Delete以便在会话结束时调用。

相关问题