RAbbitMQ:如何使用EventBasicConsumer在队列为空时停止消费、通道和连接

xeufq47z  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(174)

我们有一些队列,有相同数量的错误队列。
我的老板让我写一个窗口服务来消耗错误队列,并将数据传输到DB。Windows服务应该能够管理哪些队列应该被消耗,基于DB表中设置的配置,这意味着例如,如果我不想再消耗特定的队列,我必须能够禁用DB上该特定队列的配置,而不停止Windows服务。
我计划写一个Windows Service Job,在Quartz的帮助下进行调度,每个调度的Job都应该读取DB配置,打开一个连接,启动所有队列的任务,每个任务使用一个通道来消耗队列。通过这个解决方案,我希望解决,因为在任务结束时,通道将被关闭,连接将被关闭,错误队列将被消耗,并且在下一个作业调度中,要读取的队列的数量和名称可能会有所不同。此外,在每个调度中消耗每个队列中的所有消息应该保存打开/关闭连接/通道,如果每个调度消耗单个消息,则打开/关闭连接/通道可能是繁重的,因此调度的时间应该足以在下一个调度之前消耗队列中的所有错误消息。此外,调度队列将给予我机会配置队列在数据库中使用,而无需停止Windows服务。
现在我写了一些代码来测试消耗单个队列的解决方案

var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueParName, autoAck: false, consumer: consumer);
bool queueEmpty = false;
while (!queueEmpty)
{
    try
        BasicDeliverEventArgs result;
        bool bRead = consumer.Queue.Dequeue(timeOutQueueEmpty, out result);

        if (bRead)
        {
            var msgBody = Encoding.UTF8.GetString(result.Body);
            // TO DB ...
        }
        else
        {
            queueEmpty = true;
        }
    }
    catch (EndOfStreamException ex)
    {
        // ...
    }
}

问题是EventBasicConsumer已经过时,在许多地方都是为了避免EventBasicConsumer而编写的

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    // TO DB...
};
channel.BasicConsume(queue: queueParName, autoAck: true, consumer: consumer);

但是对于EventBasicConsumer,我无法理解如何在队列为空时停止消费,以便关闭连接和通道
编辑
一个更深层次的解释,正如评论中所问的。把所有的代码都放在这里是很困难的,因为我们使用了很多我们公司的编译库,所以代码不会完全被理解。总之,简化:
1.我有一个主机项目,在其中使用石英包调度作业每N次。
...

var schedule = SimpleScheduleBuilder.Create();
schedule.WithIntervalInSeconds(ConfigurationManager.AppSettings["..."]);
schedule.RepeatForever();
s.ScheduleQuartzJob(q =>
    q.WithJob(() =>
        JobBuilder.Create<RabbitErrorDequeuerJob>().Build())
            .AddTrigger(() =>
                TriggerBuilder.Create()
                    .WithSchedule(schedule)
                    .Build())
);

...
我让Ninject IOC,在Module文件中注入连接

// *** Ninject disposes every Disposable object that has another scope other than InTransientScope
Bind<IConnection>().ToMethod(x =>
{
IConnectionFactory cnf = new ConnectionFactory();
cnf.Uri = new Uri(ConfigurationManager.AppSettings["..."]);
return cnf.CreateConnection();
}).InCallScope();

我有一个Job项目,由Quartz每N次调度一次,其中我有Execute方法(Quartz.IJob接口)

public void Execute(IJobExecutionContext context)
    {
        try
        {
            List<RabbitQueueConfiguration> lst = //...LIST OF QUEUTE TO DEQUE FROM DATABASE
            foreach (RabbitQueueConfiguration queue in lst)
            {
                Task t = Task.Factory.StartNew(() =>
                {
                    DequeuSingleQueue(queue);
                });
            }
        }
        catch (Exception ex)
        {
            _log.FatalFormat("Error ", ex.Message);
            throw;
        }
    }

在DequeuSingleQueue(queue)中,存在出队的核心

t5zmwmid

t5zmwmid1#

您应该使用BasicGet一次处理一条消息。

相关问题