rabbitmq AMQP Laravel-NestJS RPC,无法创建响应

am46iovg  于 2022-12-23  发布在  RabbitMQ
关注(0)|答案(1)|浏览(145)

我使用amq.rabbitmq. reply-to伪队列realese RPC,但无法达到我的请求的答案。在RPC NestJs发布者- NestJs消费者它工作得很好。但如果我切换发布者到Laravel我的NestJS客户端不创建响应。Laravel发布者:

$this->response   = null;
        $this->corr_id    = Str::uuid()->toString();
        $waitingQueue     = 'test_queue';
        $connection       = new AMQPStreamConnection( 'rabbitmq', 5672, 'root', 'root' );
        $channel          = $connection->channel();
        $responseQueue    = 'amq.rabbitmq.reply-to';
        $channel->basic_consume(
            'amq.rabbitmq.reply-to',
            '',
            false,
            true,
            false,
            false,
            function ( $response ) {
            if ( $response->get( 'correlation_id' ) == $this->corr_id ) {
                $this->response = $response->body;
            }
        }
        );

        $data = json_encode( [
            'pattern' => 'test-pattern',
            'data'    => 'some_data'
        ] );

        $msg = new AMQPMessage(
            $data,
            [
                'correlation_id' => $this->corr_id,
                'reply_to'       => $responseQueue,
                'delivery_mode'  => 1,
                'headers'        => []
            ]
        );

        $channel->basic_publish(
            $msg,
            '',
            $waitingQueue,
            true
        );
        while ( !$this->response ) {
            $channel->wait( );
        }
        $channel->close();
        $connection->close();
        dump( $this->response );

嵌套客户端:

@MessagePattern('test-pattern')
public async getAll(data) {
    const answer = await this.somelogic
    return answer
}

嵌套Js发布者:

return await this.client.send('test-pattern', 'some_data').toPromise()

其中客户端:

return ClientProxyFactory.create({
            transport: Transport.RMQ,
            options: {
                urls: [`amqp://root:root@localhost:5672`],
                queue: 'test_queue',
            }
        })

只有当我在自定义处理程序上切换NestJs客户端时,它才适用于Laravel出版商。

@MessagePattern('test-pattern')
public async getAll(@Payload() requestData, @Ctx() context: RmqContext) {
    const channel = context.getChannelRef();
    const originalMsg = context.getMessage();
    const answer = await this.somelogic

    channel.sendToQueue(
        originalMsg.properties.replyTo,
        Buffer.from(
            JSON.stringify({
                data: answer,
            })
        ),
        {
            correlationId: originalMsg.properties.correlationId
        }
    )
    channel.ack(originalMsg)
}

但我希望保留原始代码,以便可以在不同的消息代理之间切换。
我的消息日志:从拉腊维尔出发:

Exchange    (AMQP default)
Routing Key test_queue
Redelivered ○
Properties  
reply_to:   amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjU3Mzg4MDYAAGcRAAAAAGHC8ds=.9BLsGuzLIVkwrmFmR4lO6g==
correlation_id: c546bc52-2e8e-4725-96d0-81752f36d597
delivery_mode:  1

来自嵌套:

Exchange    (AMQP default)
Routing Key test_queue
Redelivered ○
Properties  
reply_to:   amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjU3Mzg4MDYAAGc5AAAAAGHC8ds=.v0yse/O8NTuRgKfRnORs9g==
correlation_id: b2b07701-d25d-4618-b5a8-8b73bf52428d
delivery_mode:  1
headers:
xkrw2x1b

xkrw2x1b1#

一年后,你的问题肯定已经解决了,但是对于任何有同样问题的人来说。NestJs原生RabbitMQ库只在你的消息包含一个UUID键id时才将消息发送到回复队列。
更改以下内容:

$data = json_encode( [
     'pattern' => 'test-pattern',
     'data'    => 'some_data'
 ]);

收件人:

$data = json_encode( [
     'pattern' => 'test-pattern',
     'data'    => 'some_data'
     'id'.     => Str:uuid() // use Illuminate\Support\Str;
 ]);

相关问题