我对RabbitMQ有点陌生,但我理解AMQP的概念。我一直在尝试在Laravel中实现它,因为我看到的所有实现都不能达到我想要的效果。
我有一个队列数组,这些队列需要通过单个Artisan命令逐个处理。首先是queue1
,然后是queue2
,依此类推。我知道,当您调用basic_consume
时,它会注册一个使用者并开始使用消息,如果没有正确关闭,消费者保持活动状态,当启动另一个basic_consume
时,它将创建另一个消费者。这将导致在两个消费者之间拆分队列。
在我的测试Artisan命令中,我试图清除一个队列,并在完成后关闭消费者。
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class AMQPconsume extends Command
{
protected $signature = 'queues:amqp_consume';
protected $description = 'Binds all queues to exchanges';
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
$callback = function ($msg) {
Log::info('Message proccessed: ' . $msg->body);
$this->output->writeln( ' [x] Received ' . $msg->body);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('update_order_status', '', false, true, false, false, $callback);
$channel->consume();
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
少数观察结果:
1.不向日志中写入任何内容
1.我可以得到控制台输出
1.该命令从未关闭。它一直在运行。
如何确保在处理完队列后,命令停止并终止/注销使用者?
1条答案
按热度按时间kh212irz1#
解决方案原来是很难找到,但很容易做到...
它将抛出一个异常,然后,只是尝试...捕捉异常,并重新开始。