如何发送RPC replyTo - NestJs Rabbitmq

doinxwow  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(2)|浏览(176)

我有一个简单的NestJs微服务应用程序,它正在监听来自Rabbitmq服务的消息。

async function bootstrap() {
  dotenv.config();
  // TODO: Check for ENV valid
  const rabbitOptions = {
    urls: [`amqp://${process.env.MQ_HOSTNAME}:${process.env.MQ_PORT}`],
    queue: process.env.TASK_QUEUE_NAME,
    queueOptions: { durable: false },
  };
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.RMQ,
    options: rabbitOptions,
  });
  await app.listen(() => console.log(`listening...`));
}
bootstrap();

...

@MessagePattern()
async respondQueue(@Payload() payload: AppleDto, @Ctx() context: RmqContext): Promise<any> {
  console.log('received message', payload);
  console.log(context.getArgs()[0].properties);
  return { response: 'wow' };
}

当我从Rabbitmq UI发布消息时,它可以很好地使用消息。
但是我在不同的队列中回复邮件失败。

不确定这是不支持的还是我做错了什么。

piv4azn7

piv4azn71#

当我在模块中创建客户端时,我遇到了这个问题,我把队列作为一个参数,逻辑上,我也应该提供replyQueue。
看看下面的代码,然后我的意见。

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { QueueService } from './queue.service';
import { ConfigModule } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true }),
    ClientsModule.register([
      {
        name: 'queue-module',
        transport: Transport.RMQ,
        options: {
          urls: [process.env.BROKER_URI!],
          queue: process.env.MESSAGING_DISPATCH_QUEUE!,
        },
      },
    ]),
  ],
  controllers: [],
  providers: [QueueService],
  exports: [QueueService],
})
export class QueueModule {}

我做了一个接口的小过程。在ClientsModule里面得到ClientsModuleOptionsClientsModuleOptions实际上是ClientProviderClientProvider可以是ClientOptionsClientOptions在Rabbitmq的情况下是RmqOptions。在RmqOptions下面,它是:

export interface RmqRecordOptions {
    expiration?: string | number;
    userId?: string;
    CC?: string | string[];
    mandatory?: boolean;
    persistent?: boolean;
    deliveryMode?: boolean | number;
    BCC?: string | string[];
    contentType?: string;
    contentEncoding?: string;
    headers?: Record<string, string>;
    priority?: number;
    messageId?: string;
    timestamp?: number;
    type?: string;
    appId?: string;
}
export declare class RmqRecord<TData = any> {
    readonly data: TData;
    options?: RmqRecordOptions;
    constructor(data: TData, options?: RmqRecordOptions);
}
export declare class RmqRecordBuilder<TData> {
    private data?;
    private options?;
    constructor(data?: TData);
    setOptions(options: RmqRecordOptions): this;
    setData(data: TData): this;
    build(): RmqRecord;
}

因此,如果我按如下所示更新我的app.module.ts,replyQueue将是我的值。

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { QueueService } from './queue.service';
import { ConfigModule } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true }),
    ClientsModule.register([
      {
        name: 'queue-module',
        transport: Transport.RMQ,
        options: {
          urls: [process.env.BROKER_URI!],
          queue: process.env.MESSAGING_DISPATCH_QUEUE!,
          replyQueue: process.env.MESSAGING_DISPATCH_REPLY_QUEUE!,
        },
      },
    ]),
  ],
  controllers: [],
  providers: [QueueService],
  exports: [QueueService],
})
export class QueueModule {}
hlswsv35

hlswsv352#

我已经在同一个问题上纠结了好几天了。NestJS RabbitMQ微服务只会在有效负载包含id字段时才将响应发送回reply_to队列。否则,响应永远不会发送回RabbitMQ。
在您的情况下,请更换:

{ "data": "apples" }

签署人:

{ "id": "apples233", "data": "apples" }

完整地说,在观察NestJS RabbitMQ生产者和消费者之间的双向通信时,我注意到生产者发送到队列的消息是这样格式化的:

  • 内容 *
reply_to = amq.rabbitmq.reply-to
correlation_id = <uuid>
  • 有效负载 *
{
  "id": "<the same uuid as for the correlation_id property>",
  "pattern": "<the string you'll set in the MessagePattern decorator>",
  "data":
  {
    ...
  }
}

reply_toamq.rabbitmq.reply-to值触发RabbitMQ的Direct Reply-To特性,但是如果您希望响应进入您选择的预定义队列,则可以设置任何其他值。

相关问题