RabbitMQ不断消耗;所有处理完毕后停止Artisan命令

l7mqbcuq  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(125)

我对RabbitMQ有点陌生,但我理解AMQP的概念。我一直在尝试在Laravel中实现它,因为我看到的所有实现都不能达到我想要的效果。
我有一个队列数组,这些队列需要通过单个Artisan命令逐个处理。首先是queue1,然后是queue2,依此类推。我知道,当您调用basic_consume时,它会注册一个使用者并开始使用消息,如果没有正确关闭,消费者保持活动状态,当启动另一个basic_consume时,它将创建另一个消费者。这将导致在两个消费者之间拆分队列。
在我的测试Artisan命令中,我试图清除一个队列,并在完成后关闭消费者。

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class AMQPconsume extends Command
{

    protected $signature = 'queues:amqp_consume';

    protected $description = 'Binds all queues to exchanges';

    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/');

        $channel = $connection->channel();

        $callback = function ($msg) {
            Log::info('Message proccessed: ' . $msg->body);
            $this->output->writeln( ' [x] Received ' . $msg->body);
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume('update_order_status', '', false, true, false, false, $callback);

        $channel->consume();

        while ($channel->is_consuming()) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

少数观察结果:
1.不向日志中写入任何内容
1.我可以得到控制台输出
1.该命令从未关闭。它一直在运行。
如何确保在处理完队列后,命令停止并终止/注销使用者?

kh212irz

kh212irz1#

解决方案原来是很难找到,但很容易做到...

while ($channel->is_consuming()) {
            $channel->wait(null, null, 2); // <-- two is the amount of seconds it has to wait before a timeout is called
        }

它将抛出一个异常,然后,只是尝试...捕捉异常,并重新开始。

相关问题