在Nest.js上通过Redis Adapter使用Socket.IO的正确方法是什么

ycl3bljg  于 2022-09-21  发布在  Redis
关注(0)|答案(2)|浏览(215)

我正在尝试使用Socket.IO创建一个样例Nest.js应用程序,以使用Socket.IO将事件发布到多个微服务。

似乎在Redis v4中,当客户端被创建时,它不再自动连接。

我有以下适配器:

import { IoAdapter } from '@nestjs/platform-socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { ServerOptions } from 'socket.io';

const pubClient = createClient({
  url: 'redis://redis:6379',
  password: 'pass',
});

const subClient = pubClient.duplicate();

const redisAdapter = createAdapter(pubClient, subClient);

export class RedisIoAdapter extends IoAdapter {
  override createIOServer(port: number, options?: ServerOptions): any {
    const server = super.createIOServer(port, options);

    server.adapter(redisAdapter);

    return server;
  }
}

调用this.server.to('room').emit()时,我收到以下错误:

server  | /app/node_modules/@node-redis/client/dist/lib/client/index.js:407
server  |         return Promise.reject(new errors_1.ClientClosedError());
server  |                               ^
server  | 
server  | ClientClosedError: The client is closed
server  |     at Commander._RedisClient_sendCommand (/app/node_modules/@node-redis/client/dist/lib/client/index.js:407:31)
server  |     at Commander.commandsExecutor (/app/node_modules/@node-redis/client/dist/lib/client/index.js:166:154)
server  |     at Commander.BaseClass.<computed> [as publish] (/app/node_modules/@node-redis/client/dist/lib/commander.js:8:29)
server  |     at RedisAdapter.broadcast (/app/node_modules/@socket.io/redis-adapter/dist/index.js:406:28)
server  |     at BroadcastOperator.emit (/app/node_modules/socket.io/dist/broadcast-operator.js:109:22)
server  |     at AppGateway.handleMessage (/app/dist/app.gateway.js:21:30)
server  |     at /app/node_modules/@nestjs/websockets/context/ws-context-creator.js:43:33
server  |     at processTicksAndRejections (node:internal/process/task_queues:96:5)
server  |     at async AppGateway.<anonymous> (/app/node_modules/@nestjs/websockets/context/ws-proxy.js:11:32)
server  |     at async WebSocketsController.pickResult (/app/node_modules/@nestjs/websockets/web-sockets-controller.js:91:24)
server  | 
server  | Node.js v17.4.0
server exited with code 1

我尝试降级到"redis": "^3.1.2""socket.io-redis": "^6.0.0",并更新了相关代码(例如,使用RedisClient而不是createClient),一切似乎运行正常(我使用三台服务器在K8中尝试了这一点,所有客户端都收到了消息)。

不过,我想用最新的版本。考虑到RedisClient.connect是一个异步函数,在这种情况下连接到Redis的正确方式是什么?createIOServer也不是一个异步函数,所以我也不能在那里调用connect

仅供参考,以下是我的main.ts文件:

async function bootstrap() {
  const app = await NestFactory.create<NestExpressApplication>(AppModule);
  app.useWebSocketAdapter(new RedisIoAdapter(app));
  await app.listen(3000);
}
bootstrap();

app.gateway.ts

import { SubscribeMessage, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

type Payload = {
  name: String;
  text: String;
};

@WebSocketGateway({
  cors: {
    origin: '*',
  },
})
export class AppGateway {
  @WebSocketServer() server: Server;

  @SubscribeMessage('msgToServer')
  handleMessage(client: Socket, payload: Payload) {
    this.server.to('msgRoom').emit('msgToClient', payload);
  }

  handleConnection(client: Socket, ...args: any[]) {
    client.join('msgRoom');
  }
}
wwwo4jvm

wwwo4jvm1#

您应该使用redis3 npm i --save redis@3。只有微服务文档会告诉我们这一点,但对整个Nest包都有效。

import { IoAdapter } from '@nestjs/platform-socket.io';
import { Server, ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

import { INestApplication } from '@nestjs/common';
import { ConfigurationService } from 'src/configuration/configuration.service';

export class RedisIoAdapter extends IoAdapter {
  protected redisAdapter;

  constructor(app: INestApplication) {
    super(app);
    const configService = app.get(ConfigurationService);

    const pubClient = createClient({
      host: configService.get('REDIS_HOST'),
      port: configService.get('REDIS_PORT'),
    });
    const subClient = pubClient.duplicate();

    this.redisAdapter = createAdapter(pubClient, subClient);
  }

  createIOServer(port: number, options?: ServerOptions) {
    const server = super.createIOServer(port, options) as Server;

    server.adapter(this.redisAdapter);

    return server;
  }
}

它可以与Redis@4(https://socket.io/docs/v4/redis-adapter/#usage)配合使用

import { IoAdapter } from '@nestjs/platform-socket.io';
import { Server, ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

import { INestApplication } from '@nestjs/common';
import { ConfigurationService } from 'src/configuration/configuration.service';

export class RedisIoAdapter extends IoAdapter {
  protected redisAdapter;

  constructor(app: INestApplication) {
    super(app);
    const configService = app.get(ConfigurationService);

    const pubClient = createClient({
      socket: {
        host: configService.get('REDIS_HOST'),
        port: configService.get('REDIS_PORT'),
      },
    });
    const subClient = pubClient.duplicate();

    pubClient.connect();  // <------
    subClient.connect();  // <------

    this.redisAdapter = createAdapter(pubClient, subClient);
  }

  createIOServer(port: number, options?: ServerOptions) {
    const server = super.createIOServer(port, options) as Server;

    server.adapter(this.redisAdapter);

    return server;
  }
}
mkshixfv

mkshixfv2#

我在redis-io.adapter.ts中添加了一个函数来连接客户端:

export async function connectRedis() {
  if (pubClient.isOpen && subClient.isOpen) return;

  await Promise.all([pubClient.connect(), subClient.connect()]);
}

我把它称为main.ts,就在useWebSocketAdapter之前

async function bootstrap() {
  const app = await NestFactory.create<NestExpressApplication>(AppModule);
  await connectRedis();
  app.useWebSocketAdapter(new RedisIoAdapter(app));
  await app.listen(3000);
}
bootstrap();

我还将createAdapter调用移到了createIOServer中,这样,无论何时创建适配器,Redis客户端都将始终连接。现在似乎运行得很好,就像以前的版本一样。

相关问题