io.reactivex.Observable.share()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(144)

本文整理了Java中io.reactivex.Observable.share()方法的一些代码示例,展示了Observable.share()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.share()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:share

Observable.share介绍

[英]Returns a new ObservableSource that multicasts (and shares a single subscription to) the original ObservableSource. As long as there is at least one Observer this ObservableSource will be subscribed and emitting data. When all subscribers have disposed it will dispose the source ObservableSource.

This is an alias for #publish(). ConnectableObservable#refCount().

Scheduler: share does not operate by default on a particular Scheduler.
[中]返回一个新的ObservableSource,该ObservableSource多播(并共享一个订阅)原始ObservableSource。只要至少有一个观察者,这个可观察资源就会被订阅并发送数据。当所有订阅者都处理完后,它将处理源ObservableSource。
这是#publish()的别名。可连接可观察#refCount()。
调度程序:共享默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: trello/RxLifecycle

/**
 * Binds the given source to a lifecycle.
 * <p>
 * This method determines (based on the lifecycle sequence itself) when the source
 * should stop emitting items. It uses the provided correspondingEvents function to determine
 * when to unsubscribe.
 * <p>
 * Note that this is an advanced usage of the library and should generally be used only if you
 * really know what you're doing with a given lifecycle.
 *
 * @param lifecycle the lifecycle sequence
 * @param correspondingEvents a function which tells the source when to unsubscribe
 * @return a reusable {@link LifecycleTransformer} that unsubscribes the source during the Fragment lifecycle
 */
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
                         @Nonnull final Function<R, R> correspondingEvents) {
  checkNotNull(lifecycle, "lifecycle == null");
  checkNotNull(correspondingEvents, "correspondingEvents == null");
  return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}

代码示例来源:origin: alibaba/Tangram-Android

public RxTimer(long interval) {
  this.mInterval = interval;
  this.mStatus = TimerStatus.Waiting;
  this.mIntervalObservable = Observable
      .interval(0, this.mInterval, TimeUnit.MILLISECONDS)
      .doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
          mStatus = TimerStatus.Running;
          Log.d("RxTimerSupportTest", "accept " + disposable + " status " + mStatus);
        }
      })
      .doOnDispose(new Action() {
        @Override
        public void run() throws Exception {
          mStatus = TimerStatus.Paused;
          Log.d("RxTimerSupportTest", "on dispose " + " status " + mStatus);
        }
      })
      .doOnTerminate(new Action() {
        @Override
        public void run() throws Exception {
          mStatus = TimerStatus.Stopped;
          Log.d("RxTimerSupportTest", "on terminate " + " status " + mStatus);
        }
      })
      .share();
}

代码示例来源:origin: alibaba/Tangram-Android

public static <T, E> LifecycleTransformer<T> bindToLifeCycle(Observable<E> lifecycle,
  final Function<E, E> correspondingEvents) {
  Observable<E> lifecycleCopy = lifecycle.share();
  return new LifecycleTransformer<>(Observable.combineLatest(lifecycle.take(1).map(correspondingEvents),
    lifecycleCopy.skip(1),
    new BiFunction<E, E, Boolean>() {
      @Override
      public Boolean apply(E e, E e2) throws Exception {
        return e.equals(e2);
      }
    }).filter(new Predicate<Boolean>() {
    @Override
    public boolean test(Boolean cmpResult) throws Exception {
      return cmpResult;
    }
  }));
}

代码示例来源:origin: Polidea/RxAndroidBle

private Observable<RxBleScanResult> createScanOperationApi18(@Nullable final UUID[] filterServiceUUIDs) {
  final Set<UUID> filteredUUIDs = uuidUtil.toDistinctSet(filterServiceUUIDs);
  final LegacyScanOperation
      scanOperation = new LegacyScanOperation(filterServiceUUIDs, rxBleAdapterWrapper, uuidUtil);
  return operationQueue.queue(scanOperation)
      .doFinally(new Action() {
        @Override
        public void run() throws Exception {
          synchronized (queuedScanOperations) {
            queuedScanOperations.remove(filteredUUIDs);
          }
        }
      })
      .mergeWith(this.<RxBleInternalScanResultLegacy>bluetoothAdapterOffExceptionObservable())
      .map(new Function<RxBleInternalScanResultLegacy, RxBleScanResult>() {
        @Override
        public RxBleScanResult apply(RxBleInternalScanResultLegacy scanResult) {
          return convertToPublicScanResult(scanResult);
        }
      })
      .share();
}

代码示例来源:origin: ReactiveX/RxJava

sourceUnsubscribed.set(true);
}).share();

代码示例来源:origin: ReactiveX/RxJava

.share()

代码示例来源:origin: Polidea/RxAndroidBle

final Observable<Boolean> sharedConnectButtonClicks = activatedClicksObservable(connectButton).share();
final Observable<Boolean> sharedNotifyButtonClicks = activatedClicksObservable(notifyButton).share();
final Observable<Boolean> sharedIndicateButtonClicks = activatedClicksObservable(indicateButton).share();

代码示例来源:origin: f2prateek/rx-preferences

private RxSharedPreferences(final SharedPreferences preferences) {
 this.preferences = preferences;
 this.keyChanges = Observable.create(new ObservableOnSubscribe<String>() {
  @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
   final OnSharedPreferenceChangeListener listener = new OnSharedPreferenceChangeListener() {
    @Override
    public void onSharedPreferenceChanged(SharedPreferences preferences, String key) {
     emitter.onNext(key);
    }
   };
   emitter.setCancellable(new Cancellable() {
    @Override public void cancel() throws Exception {
     preferences.unregisterOnSharedPreferenceChangeListener(listener);
    }
   });
   preferences.registerOnSharedPreferenceChangeListener(listener);
  }
 }).share();
}

代码示例来源:origin: JakeWharton/RxReplayingShare

@Override public Observable<T> apply(Observable<T> upstream) {
 LastSeen<T> lastSeen = new LastSeen<>();
 return new LastSeenObservable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
}

代码示例来源:origin: bitrich-info/xchange-stream

public Observable<BitmexWebSocketTransaction> subscribeBitmexChannel(String channelName) {
  return subscribeChannel(channelName).map(s -> {
    BitmexWebSocketTransaction transaction = objectMapper.treeToValue(s, BitmexWebSocketTransaction.class);
    return transaction;
  })
      .share();
}

代码示例来源:origin: manas-chaudhari/android-mvvm

public ViewPagerAdapter(@NonNull Observable<List<ViewModel>> viewModels, @NonNull ViewProvider viewProvider, @NonNull ViewModelBinder binder) {
  source = viewModels
      .observeOn(AndroidSchedulers.mainThread())
      .doOnNext(new Consumer<List<ViewModel>>() {
        @Override
        public void accept(List<ViewModel> viewModels) throws Exception {
          latestViewModels = (viewModels != null) ? viewModels : new ArrayList<ViewModel>();
          notifyDataSetChanged();
        }
      })
      .share();
  this.viewProvider = viewProvider;
  this.binder = binder;
}

代码示例来源:origin: bitrich-info/xchange-stream

@Override
  public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
    Observable<PoloniexWebSocketTradeEvent> subscribedTrades = service.subscribeCurrencyPairChannel(currencyPair)
        .filter(s -> s.getEventType().equals("t"))
        .map(s -> (PoloniexWebSocketTradeEvent) s).share();

    return subscribedTrades
        .map(s -> adaptPoloniexPublicTrade(s.toPoloniexPublicTrade(currencyPair), currencyPair));
  }
}

代码示例来源:origin: bitrich-info/xchange-stream

public Observable<JsonNode> subscribeChannel(String channelName) {
  LOG.info("Subscribing to channel {}.", channelName);
  return Observable.<JsonNode>create(e -> {
    if (!subscriptions.containsKey(channelName)) {
      subscriptions.put(channelName, e);
      pubnub.subscribe().channels(Collections.singletonList(channelName)).execute();
      LOG.debug("Subscribe channel: {}", channelName);
    }
  }).doOnDispose(() -> {
    LOG.debug("Unsubscribe channel: {}", channelName);
    pubnub.unsubscribe().channels(Collections.singletonList(channelName)).execute();
  }).share();
}

代码示例来源:origin: info.bitrich.xchange-stream/service-pubnub

public Observable<JsonNode> subscribeChannel(String channelName) {
  LOG.info("Subscribing to channel {}.", channelName);
  return Observable.<JsonNode>create(e -> {
    if (!subscriptions.containsKey(channelName)) {
      subscriptions.put(channelName, e);
      pubnub.subscribe().channels(Collections.singletonList(channelName)).execute();
      LOG.debug("Subscribe channel: {}", channelName);
    }
  }).doOnDispose(() -> {
    LOG.debug("Unsubscribe channel: {}", channelName);
    pubnub.unsubscribe().channels(Collections.singletonList(channelName)).execute();
  }).share();
}

代码示例来源:origin: bitrich-info/xchange-stream

public Observable<PoloniexWebSocketEvent> subscribeCurrencyPairChannel(CurrencyPair currencyPair) {
  String channelName = currencyPair.counter.toString() + "_" + currencyPair.base.toString();
  return subscribeChannel(channelName)
      .flatMapIterable(s -> {
        PoloniexWebSocketEventsTransaction transaction = objectMapper.treeToValue(s, PoloniexWebSocketEventsTransaction.class);
        return Arrays.asList(transaction.getEvents());
      }).share();
}

代码示例来源:origin: graphql-java/graphql-java-subscription-example

public StockTickerPublisher() {
  Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {
    ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
    executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);
  });
  ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
  connectableObservable.connect();
  publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: com.jakewharton.rx2/replaying-share

@Override public Observable<T> apply(Observable<T> upstream) {
 LastSeen<T> lastSeen = new LastSeen<>();
 return new LastSeenObservable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
}

代码示例来源:origin: manas-chaudhari/android-mvvm

private ReadOnlyField(@NonNull Observable<T> source) {
  super();
  this.source = source.doOnNext(new Consumer<T>() {
    @Override
    public void accept(T t) throws Exception {
      ReadOnlyField.super.set(t);
    }
  }).doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
      Log.e("ReadOnlyField", "onError in source observable", throwable);
    }
  }).onErrorResumeNext(Observable.<T>empty()).share();
}

代码示例来源:origin: networknt/light-example-4j

public ChannelPublisher() {
  Observable<MessageAddedEvent> messageAddedEventObservable = Observable.create(messageAddedEventObservableEmitter -> {
    this.emitter = messageAddedEventObservableEmitter;
  });
  ConnectableObservable<MessageAddedEvent> connectableObservable = messageAddedEventObservable.share().publish();
  connectableObservable.connect();
  publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void test() {
    Observable<Integer> myObservable = Observable.just(1)
        .<Integer>flatMap(i -> {
          throw new IllegalStateException();
        }).share();

      myObservable
        .zipWith(myObservable, Pair::of)
        .subscribe(pair -> {
          //ignore
        }, throwable -> {
          //ignore
        });
  }
}

相关文章

Observable类方法