刚刚开始使用rxjs,如果问题是基本的,很抱歉。我正在尝试用它在nodejs服务器端环境中实现一个WebSocket客户端和订阅。我需要连接到WebSocket服务器,通过发送特定消息订阅频道,摄取传入消息。我希望实现在断开连接后重新连接并重新订阅,从而保持持久的开放订阅。我已经找到了如何重新连接,到目前为止,原型看起来如下:
import WebSocket from 'ws';
import { retry, RetryConfig } from 'rxjs/operators'
import { webSocket } from 'rxjs/webSocket';
const ws$ = webSocket({
url: '...',
WebSocketCtor: WebSocket as any // apparently typings bug
});
// set up reconnects
const retryConfig: RetryConfig = {
delay: 3000,
};
ws$.pipe(retry(retryConfig));
// subscribe to channel by sending a message
ws$.next({cmd: 'subscribe', channel: 'updates'});
ws$.subscribe({
next: (msg) => console.log('msg', msg),
error: (e) => console.error('error', e),
complete: () => console.log('complete')
});
到目前为止,这似乎是有效的,但在断开连接后,它只会重新打开连接,我想知道在这种情况下如何执行自定义逻辑?特别是在这里,我想重新发送订阅消息。
除了一个关于如何做到这一点的说明之外,我真的很希望能解释一下为什么会这样,因为rxjs似乎有点需要学习曲线。谢谢大家!
1条答案
按热度按时间1cklez4t1#
你需要像这样捕获
pipe()
的返回值: