nodejs&typescript-socket controllers&ihandyredis pubsub订阅用户事件模型不工作

bmp9r5qi  于 2021-06-10  发布在  Redis
关注(0)|答案(0)|浏览(280)

我正在尝试制作一个非常基本的socket io和redis订阅和监听系统,但是,看起来redis的频道好像混在一起了。我什么都试过了,什么都没用。
我有一个 user1 他们已经订阅了他们的活动 user2 事件。如下图所示,这是客户机。

//user1 - socket setup
var socket = io('${heatmap.socket_host}/${user.id}', {
        'transports': ['websocket'],
        'autoConnect': false,
        'query': {'access': heatmap.accessToken}
      });

//user2 - socket setup
var socket = io('${heatmap.socket_host}/${user2.id}', {
        'transports': ['websocket'],
        'autoConnect': false, // optional
        'query': {'access': heatmap.accessToken},
      });

//user1 client

user.websocket.connect();

  //SUBSCRIBING TO THEMSELVES
  user.websocket.on('connect', (_) {
    print('[connected to me]');
    user.websocket.on('LocationChange', (data) => print('[me] - LocationChange: $data'));
  });

  var devices = await heatmap.devices.find();
  var device = devices.first;

  //POSTING EVENTS TO THEMSELVES
  Timer.periodic(Duration(seconds: 10), (timer) async {
    await heatmap.events.postLocation(55, 45, device.id);
  });

  var people = await heatmap.requests.getGrantedRequests();
  people.sent.first.device.user.websocket.connect();

  //SUBSCRIBING TO USER2
  people.sent.first.device.user.websocket.on('connect', (_) {
    print('connected to ${people.sent.first.device.user.name}');
    people.sent.first.device.user.websocket.on('LocationChange', (data) => print('[other] - LocationChange: $data'));
  });

//user2 client (not subscribed to any events, but is posting events which should be received as `[other] - LocationChange: data`)
Timer.periodic(Duration(seconds: 10), (timer) async {
    await heatmap.events.postLocation(51.34, -0.123, '02355fab-9afa-4e26-b7ea-    fe7af867a751');
    print('Posted');
    });

在nodejs中,我为sockets使用socket控制器,为redis使用ihandyredis。它们的设置如下。

const redis = createHandyClient(redisPort, redisHost);
  const subscriber = createHandyClient(redisPort, redisHost);

  Container.set("redis", redis);
  Container.set("subscriber", subscriber);

  const socketServer = io(webServer);
  useSocketServer(socketServer, { controllers: [EventsSocketController] });

我有一个端点,用户将事件发布到该端点 POST /v1/events 并发布到 redisClient 如下所示

await this.redisClient.publish(`${user.id}:events`, JSON.stringify(event));

以下是 EventSocketController 这就是我管理连接、断开连接和订阅redis频道的地方

@SocketController('/:id')
export class EventsSocketController {
  @Inject("redis")
  private redisClient!: IHandyRedis;

  @Inject("subscriber")
  private subscriberClient!: IHandyRedis;

  @InjectRepository(Session)
  private sessionRepository!: Repository<Session>;

  @InjectRepository(ShareRequest)
  private requestRepository!: Repository<ShareRequest>;

  @InjectRepository(User)
  private userRepository!: Repository<User>;

  @OnConnect()
  async connection(
    @ConnectedSocket() socket: Socket,
    @SocketQueryParam("access") access: string,
    @NspParam('id') userId: string
  ): Promise<void> {

    console.log(socket.id);

    try {
      const session = await this.sessionRepository.findOneOrFail(
        { access },
        { relations: ["user"] }
      );

      this.subscriberClient.redis.on("message", (_, message) => {
        this.redisClient.redis.hgetall(message, (err, res) => {
          socket.emit(JSON.parse(message)['type'], message);
        })
      });

      if (userId && userId != session.user.id) {
        const granted = await this.requestRepository
          .createQueryBuilder("request")
          .innerJoin("request.from", "from", "from.id = :id", { id: session.user.id })
          .where("request.granted = :granted", { granted: true })
          .getMany();
        if (granted.length === 0) throw new NotFoundError();

        const user = await this.userRepository
          .createQueryBuilder("user")
          .where("user.id = :userId", { userId })
          .getOne();
        if (!user) throw new NotFoundError();

        await this.subscriberClient.subscribe(`${user.id}:events`);

      }

      await this.subscriberClient.subscribe(`${session.user.id}:events`);

    } catch (error) {
      socket.emit("BAD_TOKEN");
      socket.disconnect();
    }
  }

  @OnDisconnect()
  async disconnect(@SocketQueryParam("userId") userId: string): Promise<void> {
    await this.subscriberClient.unsubscribe(`${userId}:events`);
  }
}

问题是我收到的客户

[me] - LocationChange: {"type":"LocationChange","data":{"latitude":"51.34","longitude":"-0.123","timestamp":"2020-07-13 16:39:45.757820"}}

[other] - LocationChange: {"type":"LocationChange","data":{"latitude":"51.34","longitude":"-0.123","timestamp":"2020-07-13 16:39:45.757820"}}

[me] - LocationChange: {"type":"LocationChange","data":{"latitude":"55.0","longitude":"45.0","timestamp":"2020-07-13 16:39:49.282664"}}

[other] - LocationChange: {"type":"LocationChange","data":{"latitude":"55.0","longitude":"45.0","timestamp":"2020-07-13 16:39:49.282664"}}

提前谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题