在rxjs中重新订阅WebSocket通道

kmpatx3s  于 2023-05-07  发布在  其他
关注(0)|答案(1)|浏览(171)

刚刚开始使用rxjs,如果问题是基本的,很抱歉。我正在尝试用它在nodejs服务器端环境中实现一个WebSocket客户端和订阅。我需要连接到WebSocket服务器,通过发送特定消息订阅频道,摄取传入消息。我希望实现在断开连接后重新连接并重新订阅,从而保持持久的开放订阅。我已经找到了如何重新连接,到目前为止,原型看起来如下:

import WebSocket from 'ws';
import { retry, RetryConfig } from 'rxjs/operators'
import { webSocket } from 'rxjs/webSocket';

const ws$ = webSocket({
  url: '...',
  WebSocketCtor: WebSocket as any // apparently typings bug
});

// set up reconnects
const retryConfig: RetryConfig = {
  delay: 3000,
};

ws$.pipe(retry(retryConfig));

// subscribe to channel by sending a message
ws$.next({cmd: 'subscribe', channel: 'updates'});

ws$.subscribe({
  next: (msg) => console.log('msg', msg),
  error: (e) => console.error('error', e),
  complete: () => console.log('complete')
});

到目前为止,这似乎是有效的,但在断开连接后,它只会重新打开连接,我想知道在这种情况下如何执行自定义逻辑?特别是在这里,我想重新发送订阅消息。
除了一个关于如何做到这一点的说明之外,我真的很希望能解释一下为什么会这样,因为rxjs似乎有点需要学习曲线。谢谢大家!

1cklez4t

1cklez4t1#

你需要像这样捕获pipe()的返回值:

ws$ = ws$.pipe(retry(retryConfig));

相关问题