<?php
declare(strict_types=1);
namespace App\MessageHandler;
use App\Message\Job;
use App\Message\JobInterface;
use App\Message\JobPool00;
use App\Message\JobPool01;
use App\Message\JobPool02;
// repeat...
use App\Message\JobPoolInterface;
use m2mQL\Job\JobManager;
use m2mQL\Service\Semaphore;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
class JobHandler implements MessageHandlerInterface
{
protected $pool;
protected $logger;
public function __construct(LoggerInterface $logger, int $pool = 0)
{
$this->pool = $pool;
$this->logger = $logger;
}
protected JobManager $jm;
/**
* @required
*/
public function setJobManager(JobManager $jm)
{
$this->jm = $jm;
}
protected Semaphore $semaphore;
/**
* @required
*/
public function setSemaphore(Semaphore $semaphore)
{
$this->semaphore = $semaphore;
}
protected MessageBusInterface $mb;
/**
* @required
*/
public function setMessageBus(MessageBusInterface $mb)
{
$this->mb = $mb;
}
public const POOL_CLASS_MAP = [
JobPool00::class,
JobPool01::class,
JobPool02::class,
// repeat, each class uses a common interface+trait...
];
public function __invoke(JobInterface $job)
{
if (!$this->pool || Job::class != \get_class($job)) {
$this->jm->runJob($job->getJob());
if ($this->pool > 0) {
if ($job instanceof JobPoolInterface) {
// decr pool counter
$this->decrPool($job->getPool());
}
}
return;
}
// pool of job consume
// the main transport is 'sync'
// each worker has a transport mapped with a classe (ugly but supported)
// some trafic optimization
$pool = $this->getLeastUsedPool();
$this->logger->info("Using pool $pool");
while ($this->incrPool($pool) < 1) {
// nothing
}
$c = self::POOL_CLASS_MAP[$pool];
$message = new $c($job->getJob(), $pool);
$this->mb->dispatch($message);
}
public const COUNTERS_KEY = 'workers_pool_counters';
protected function getLeastUsedPool(): int
{
$counters = $this->semaphore->getCounters(self::COUNTERS_KEY);
if (empty($counters)) {
return \rand(0, $this->pool - 1);
}
for ($i = 0; $i < $this->pool; ++$i) {
$idx = sprintf('%02d', $i);
if (empty($counters[$idx])) {
$counters[$idx] = 0;
}
}
\asort($counters, SORT_NUMERIC);
$value = reset($counters);
$counters = \array_filter($counters, function ($v, $k) use ($value) {
if ($v !== $value) {
return false;
}
$k = (int) \intval($k);
if ($k >= $this->pool) {
return false;
}
return true;
}, ARRAY_FILTER_USE_BOTH);
if (empty($counters)) {
return \rand(0, $this->pool - 1);
}
return (int) \intval(\array_rand($counters));
}
protected function incrPool(int $pool): int
{
return $this->semaphore->incrCounter(self::COUNTERS_KEY, sprintf('%02d', $pool));
}
protected function decrPool(int $pool): int
{
return $this->semaphore->decrCounter(self::COUNTERS_KEY, sprintf('%02d', $pool));
}
public function resetPool(int $pool): void
{
$this->semaphore->setCounter(self::COUNTERS_KEY, sprintf('%02d', $pool), 0);
}
}
配置:
parameters:
app.messenger.common_pool_options: 'delete_after_ack=true'
framework:
messenger:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
jobs_pool00: '%env(MESSENGER_TRANSPORT_DSN_JOBS_POOLXX)%00?%app.messenger.common_pool_options%'
jobs_pool01: '%env(MESSENGER_TRANSPORT_DSN_JOBS_POOLXX)%01?%app.messenger.common_pool_options%'
jobs_pool02: '%env(MESSENGER_TRANSPORT_DSN_JOBS_POOLXX)%02?%app.messenger.common_pool_options%'
// repeat...
routing:
# Route your messages to the transports
'App\Message\Job': sync # pool dispatch
'App\Message\JobPool00': jobs_pool00
'App\Message\JobPool01': jobs_pool01
'App\Message\JobPool02': jobs_pool02
...
2条答案
按热度按时间cwtwac6a1#
是的,您可以为一个队列使用多个消费者。Symfony Messenger设计为具有多个消费者。文档中Supervisor部分的示例配置已经有2个示例:
Supervisor配置文件通常位于/etc/supervisor/conf.d目录中。例如,您可以在其中创建一个新的messenger-worker.conf文件,以确保始终运行2个messenger:consume示例:
(..)
所以你可以多次运行
bin/console messenger:consume async
,在大多数情况下,这将工作,而无需额外的配置。有一个关于使用Redis传输和多个工人的警告:如果你使用Redis Transport,请注意每个worker都需要一个唯一的消费者名称,以避免多个worker处理相同的消息。实现这一点的一种方法是在Supervisor配置文件中设置一个环境变量,然后你可以在
messenger.yaml
中引用它(参见上面的Redis部分):environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
iyfjxgzm2#
我带来了一个基于多个子队列/类的解决方案,它有一个主服务器(同步传输)。
配置:
主管启动X个工作人员