let eventObservableWrapper = PublishSubject<Observable<MyEvent>>()
let eventObservable = eventObservableWrapper.switchLatest() // use this one for subscriptions
// to switch to another observable, call -
eventObservableWrapper.onNext(someNewEventObservable)
/**
* ReplayRelay it works just like hot observables, once that an observer subscribe, the Relay will replay all items already emitted
* to another observer.
* it should return 1,2,3,4,5 for both observers.
*/
@Test
public void testReplayRelay() {
ReplayRelay<String> relay = ReplayRelay.create();
relay.subscribe(result -> System.out.println("Observer1:" + result));
relay.call("1");
relay.call("2");
relay.call("3");
relay.subscribe(result -> System.out.println("Observer2:" + result));
relay.call("4");
relay.call("5");
}
5条答案
按热度按时间n3ipq98p1#
对于您正在寻找的内容,最简单的解决方案确实是他们为此提供的方法-
func take(_ count: Int)
。下面是一个操场示例:
结果如下:
是的,这在您希望通过流过滤事件的地方很有用,在存储订阅不方便的地方也很有用。
g2ieeal72#
你为什么要这样做?你特别想解决什么问题?
处置订阅的常用方法是使用处置包。
注意到我是如何在每次订阅前重新创建可丢弃的吗?这将摆脱以前的订阅。
nqwrtyyt3#
请在此处查看有关开关工作方式的详细信息-http://reactivex.io/RxJava/javadoc/rx/Observable.html#switchOnNext(rx.Observable)
x7yiwoj44#
如果我没说错的话,你想做的是订阅,而不是在项目发出后取消订阅。
如果这是你想要的,你可以使用
relay
,你永远不会被取消订阅。您可以在此处查看更多示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java
uujelgoq5#
我做了一些类似于@Odrakir提议的事情。我遇到了同样的问题,但仍然没有找到比这更好的解决方案。