本文整理了Java中io.reactivex.Observable.share()
方法的一些代码示例,展示了Observable.share()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.share()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:share
[英]Returns a new ObservableSource that multicasts (and shares a single subscription to) the original ObservableSource. As long as there is at least one Observer this ObservableSource will be subscribed and emitting data. When all subscribers have disposed it will dispose the source ObservableSource.
This is an alias for #publish(). ConnectableObservable#refCount().
Scheduler: share does not operate by default on a particular Scheduler.
[中]返回一个新的ObservableSource,该ObservableSource多播(并共享一个订阅)原始ObservableSource。只要至少有一个观察者,这个可观察资源就会被订阅并发送数据。当所有订阅者都处理完后,它将处理源ObservableSource。
这是#publish()的别名。可连接可观察#refCount()。
调度程序:共享默认情况下不会在特定调度程序上运行。
代码示例来源:origin: trello/RxLifecycle
/**
* Binds the given source to a lifecycle.
* <p>
* This method determines (based on the lifecycle sequence itself) when the source
* should stop emitting items. It uses the provided correspondingEvents function to determine
* when to unsubscribe.
* <p>
* Note that this is an advanced usage of the library and should generally be used only if you
* really know what you're doing with a given lifecycle.
*
* @param lifecycle the lifecycle sequence
* @param correspondingEvents a function which tells the source when to unsubscribe
* @return a reusable {@link LifecycleTransformer} that unsubscribes the source during the Fragment lifecycle
*/
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
@Nonnull final Function<R, R> correspondingEvents) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(correspondingEvents, "correspondingEvents == null");
return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}
代码示例来源:origin: alibaba/Tangram-Android
public RxTimer(long interval) {
this.mInterval = interval;
this.mStatus = TimerStatus.Waiting;
this.mIntervalObservable = Observable
.interval(0, this.mInterval, TimeUnit.MILLISECONDS)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
mStatus = TimerStatus.Running;
Log.d("RxTimerSupportTest", "accept " + disposable + " status " + mStatus);
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
mStatus = TimerStatus.Paused;
Log.d("RxTimerSupportTest", "on dispose " + " status " + mStatus);
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
mStatus = TimerStatus.Stopped;
Log.d("RxTimerSupportTest", "on terminate " + " status " + mStatus);
}
})
.share();
}
代码示例来源:origin: alibaba/Tangram-Android
public static <T, E> LifecycleTransformer<T> bindToLifeCycle(Observable<E> lifecycle,
final Function<E, E> correspondingEvents) {
Observable<E> lifecycleCopy = lifecycle.share();
return new LifecycleTransformer<>(Observable.combineLatest(lifecycle.take(1).map(correspondingEvents),
lifecycleCopy.skip(1),
new BiFunction<E, E, Boolean>() {
@Override
public Boolean apply(E e, E e2) throws Exception {
return e.equals(e2);
}
}).filter(new Predicate<Boolean>() {
@Override
public boolean test(Boolean cmpResult) throws Exception {
return cmpResult;
}
}));
}
代码示例来源:origin: Polidea/RxAndroidBle
private Observable<RxBleScanResult> createScanOperationApi18(@Nullable final UUID[] filterServiceUUIDs) {
final Set<UUID> filteredUUIDs = uuidUtil.toDistinctSet(filterServiceUUIDs);
final LegacyScanOperation
scanOperation = new LegacyScanOperation(filterServiceUUIDs, rxBleAdapterWrapper, uuidUtil);
return operationQueue.queue(scanOperation)
.doFinally(new Action() {
@Override
public void run() throws Exception {
synchronized (queuedScanOperations) {
queuedScanOperations.remove(filteredUUIDs);
}
}
})
.mergeWith(this.<RxBleInternalScanResultLegacy>bluetoothAdapterOffExceptionObservable())
.map(new Function<RxBleInternalScanResultLegacy, RxBleScanResult>() {
@Override
public RxBleScanResult apply(RxBleInternalScanResultLegacy scanResult) {
return convertToPublicScanResult(scanResult);
}
})
.share();
}
代码示例来源:origin: ReactiveX/RxJava
sourceUnsubscribed.set(true);
}).share();
代码示例来源:origin: ReactiveX/RxJava
.share()
代码示例来源:origin: Polidea/RxAndroidBle
final Observable<Boolean> sharedConnectButtonClicks = activatedClicksObservable(connectButton).share();
final Observable<Boolean> sharedNotifyButtonClicks = activatedClicksObservable(notifyButton).share();
final Observable<Boolean> sharedIndicateButtonClicks = activatedClicksObservable(indicateButton).share();
代码示例来源:origin: f2prateek/rx-preferences
private RxSharedPreferences(final SharedPreferences preferences) {
this.preferences = preferences;
this.keyChanges = Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
final OnSharedPreferenceChangeListener listener = new OnSharedPreferenceChangeListener() {
@Override
public void onSharedPreferenceChanged(SharedPreferences preferences, String key) {
emitter.onNext(key);
}
};
emitter.setCancellable(new Cancellable() {
@Override public void cancel() throws Exception {
preferences.unregisterOnSharedPreferenceChangeListener(listener);
}
});
preferences.registerOnSharedPreferenceChangeListener(listener);
}
}).share();
}
代码示例来源:origin: JakeWharton/RxReplayingShare
@Override public Observable<T> apply(Observable<T> upstream) {
LastSeen<T> lastSeen = new LastSeen<>();
return new LastSeenObservable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
}
代码示例来源:origin: bitrich-info/xchange-stream
public Observable<BitmexWebSocketTransaction> subscribeBitmexChannel(String channelName) {
return subscribeChannel(channelName).map(s -> {
BitmexWebSocketTransaction transaction = objectMapper.treeToValue(s, BitmexWebSocketTransaction.class);
return transaction;
})
.share();
}
代码示例来源:origin: manas-chaudhari/android-mvvm
public ViewPagerAdapter(@NonNull Observable<List<ViewModel>> viewModels, @NonNull ViewProvider viewProvider, @NonNull ViewModelBinder binder) {
source = viewModels
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<List<ViewModel>>() {
@Override
public void accept(List<ViewModel> viewModels) throws Exception {
latestViewModels = (viewModels != null) ? viewModels : new ArrayList<ViewModel>();
notifyDataSetChanged();
}
})
.share();
this.viewProvider = viewProvider;
this.binder = binder;
}
代码示例来源:origin: bitrich-info/xchange-stream
@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
Observable<PoloniexWebSocketTradeEvent> subscribedTrades = service.subscribeCurrencyPairChannel(currencyPair)
.filter(s -> s.getEventType().equals("t"))
.map(s -> (PoloniexWebSocketTradeEvent) s).share();
return subscribedTrades
.map(s -> adaptPoloniexPublicTrade(s.toPoloniexPublicTrade(currencyPair), currencyPair));
}
}
代码示例来源:origin: bitrich-info/xchange-stream
public Observable<JsonNode> subscribeChannel(String channelName) {
LOG.info("Subscribing to channel {}.", channelName);
return Observable.<JsonNode>create(e -> {
if (!subscriptions.containsKey(channelName)) {
subscriptions.put(channelName, e);
pubnub.subscribe().channels(Collections.singletonList(channelName)).execute();
LOG.debug("Subscribe channel: {}", channelName);
}
}).doOnDispose(() -> {
LOG.debug("Unsubscribe channel: {}", channelName);
pubnub.unsubscribe().channels(Collections.singletonList(channelName)).execute();
}).share();
}
代码示例来源:origin: info.bitrich.xchange-stream/service-pubnub
public Observable<JsonNode> subscribeChannel(String channelName) {
LOG.info("Subscribing to channel {}.", channelName);
return Observable.<JsonNode>create(e -> {
if (!subscriptions.containsKey(channelName)) {
subscriptions.put(channelName, e);
pubnub.subscribe().channels(Collections.singletonList(channelName)).execute();
LOG.debug("Subscribe channel: {}", channelName);
}
}).doOnDispose(() -> {
LOG.debug("Unsubscribe channel: {}", channelName);
pubnub.unsubscribe().channels(Collections.singletonList(channelName)).execute();
}).share();
}
代码示例来源:origin: bitrich-info/xchange-stream
public Observable<PoloniexWebSocketEvent> subscribeCurrencyPairChannel(CurrencyPair currencyPair) {
String channelName = currencyPair.counter.toString() + "_" + currencyPair.base.toString();
return subscribeChannel(channelName)
.flatMapIterable(s -> {
PoloniexWebSocketEventsTransaction transaction = objectMapper.treeToValue(s, PoloniexWebSocketEventsTransaction.class);
return Arrays.asList(transaction.getEvents());
}).share();
}
代码示例来源:origin: graphql-java/graphql-java-subscription-example
public StockTickerPublisher() {
Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: com.jakewharton.rx2/replaying-share
@Override public Observable<T> apply(Observable<T> upstream) {
LastSeen<T> lastSeen = new LastSeen<>();
return new LastSeenObservable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
}
代码示例来源:origin: manas-chaudhari/android-mvvm
private ReadOnlyField(@NonNull Observable<T> source) {
super();
this.source = source.doOnNext(new Consumer<T>() {
@Override
public void accept(T t) throws Exception {
ReadOnlyField.super.set(t);
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("ReadOnlyField", "onError in source observable", throwable);
}
}).onErrorResumeNext(Observable.<T>empty()).share();
}
代码示例来源:origin: networknt/light-example-4j
public ChannelPublisher() {
Observable<MessageAddedEvent> messageAddedEventObservable = Observable.create(messageAddedEventObservableEmitter -> {
this.emitter = messageAddedEventObservableEmitter;
});
ConnectableObservable<MessageAddedEvent> connectableObservable = messageAddedEventObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test() {
Observable<Integer> myObservable = Observable.just(1)
.<Integer>flatMap(i -> {
throw new IllegalStateException();
}).share();
myObservable
.zipWith(myObservable, Pair::of)
.subscribe(pair -> {
//ignore
}, throwable -> {
//ignore
});
}
}
内容来源于网络,如有侵权,请联系作者删除!