Symfony Messenger消费者池

iyfamqjs  于 2023-10-24  发布在  其他
关注(0)|答案(2)|浏览(148)

我不确定这是否可行:为同一个队列提供一个messenger消费者池?
我试过使用Redis consumer=选项,但不是这样。
也许一个“池”中间件可以做一些路由到特定的传输?

cwtwac6a

cwtwac6a1#

是的,您可以为一个队列使用多个消费者。Symfony Messenger设计为具有多个消费者。文档中Supervisor部分的示例配置已经有2个示例:
Supervisor配置文件通常位于/etc/supervisor/conf.d目录中。例如,您可以在其中创建一个新的messenger-worker.conf文件,以确保始终运行2个messenger:consume示例:
(..)
所以你可以多次运行bin/console messenger:consume async,在大多数情况下,这将工作,而无需额外的配置。有一个关于使用Redis传输和多个工人的警告:
如果你使用Redis Transport,请注意每个worker都需要一个唯一的消费者名称,以避免多个worker处理相同的消息。实现这一点的一种方法是在Supervisor配置文件中设置一个环境变量,然后你可以在messenger.yaml中引用它(参见上面的Redis部分):
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d

iyfjxgzm

iyfjxgzm2#

我带来了一个基于多个子队列/类的解决方案,它有一个主服务器(同步传输)。

<?php

declare(strict_types=1);

namespace App\MessageHandler;

use App\Message\Job;
use App\Message\JobInterface;
use App\Message\JobPool00;
use App\Message\JobPool01;
use App\Message\JobPool02;
// repeat...
use App\Message\JobPoolInterface;
use m2mQL\Job\JobManager;
use m2mQL\Service\Semaphore;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBusInterface;

class JobHandler implements MessageHandlerInterface
{
    protected $pool;
    protected $logger;

    public function __construct(LoggerInterface $logger, int $pool = 0)
    {
        $this->pool = $pool;
        $this->logger = $logger;
    }

    protected JobManager $jm;

    /**
     * @required
     */
    public function setJobManager(JobManager $jm)
    {
        $this->jm = $jm;
    }

    protected Semaphore $semaphore;

    /**
     * @required
     */
    public function setSemaphore(Semaphore $semaphore)
    {
        $this->semaphore = $semaphore;
    }

    protected MessageBusInterface $mb;

    /**
     * @required
     */
    public function setMessageBus(MessageBusInterface $mb)
    {
        $this->mb = $mb;
    }

    public const POOL_CLASS_MAP = [
        JobPool00::class,
        JobPool01::class,
        JobPool02::class,
        // repeat, each class uses a common interface+trait...
    ];

    public function __invoke(JobInterface $job)
    {
        if (!$this->pool || Job::class != \get_class($job)) {
            $this->jm->runJob($job->getJob());
            if ($this->pool > 0) {
                if ($job instanceof JobPoolInterface) {
                    // decr pool counter
                    $this->decrPool($job->getPool());
                }
            }

            return;
        }

        // pool of job consume
        // the main transport is 'sync'
        // each worker has a transport mapped with a classe (ugly but supported)

        // some trafic optimization
        $pool = $this->getLeastUsedPool();
        $this->logger->info("Using pool $pool");
        while ($this->incrPool($pool) < 1) {
            // nothing
        }
        $c = self::POOL_CLASS_MAP[$pool];
        $message = new $c($job->getJob(), $pool);

        $this->mb->dispatch($message);
    }

    public const COUNTERS_KEY = 'workers_pool_counters';

    protected function getLeastUsedPool(): int
    {
        $counters = $this->semaphore->getCounters(self::COUNTERS_KEY);
        if (empty($counters)) {
            return \rand(0, $this->pool - 1);
        }
        for ($i = 0; $i < $this->pool; ++$i) {
            $idx = sprintf('%02d', $i);
            if (empty($counters[$idx])) {
                $counters[$idx] = 0;
            }
        }
        \asort($counters, SORT_NUMERIC);
        $value = reset($counters);
        $counters = \array_filter($counters, function ($v, $k) use ($value) {
            if ($v !== $value) {
                return false;
            }
            $k = (int) \intval($k);
            if ($k >= $this->pool) {
                return false;
            }

            return true;
        }, ARRAY_FILTER_USE_BOTH);
        if (empty($counters)) {
            return \rand(0, $this->pool - 1);
        }

        return (int) \intval(\array_rand($counters));
    }

    protected function incrPool(int $pool): int
    {
        return $this->semaphore->incrCounter(self::COUNTERS_KEY, sprintf('%02d', $pool));
    }

    protected function decrPool(int $pool): int
    {
        return $this->semaphore->decrCounter(self::COUNTERS_KEY, sprintf('%02d', $pool));
    }

    public function resetPool(int $pool): void
    {
        $this->semaphore->setCounter(self::COUNTERS_KEY, sprintf('%02d', $pool), 0);
    }
}

配置:

parameters:
    app.messenger.common_pool_options: 'delete_after_ack=true'

framework:
    messenger:

        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            jobs_pool00: '%env(MESSENGER_TRANSPORT_DSN_JOBS_POOLXX)%00?%app.messenger.common_pool_options%'
            jobs_pool01: '%env(MESSENGER_TRANSPORT_DSN_JOBS_POOLXX)%01?%app.messenger.common_pool_options%'
            jobs_pool02: '%env(MESSENGER_TRANSPORT_DSN_JOBS_POOLXX)%02?%app.messenger.common_pool_options%'
            // repeat...

        routing:
            # Route your messages to the transports
            'App\Message\Job': sync # pool dispatch
            'App\Message\JobPool00': jobs_pool00
            'App\Message\JobPool01': jobs_pool01
            'App\Message\JobPool02': jobs_pool02
            ...

主管启动X个工作人员

[program:jobworker]
command=php bin/console m2mql:messenger:consume jobs_pool%(process_num)02d
numprocs=10

相关问题