io.reactivex.Observable.cache()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.6k)|赞(0)|评价(0)|浏览(228)

本文整理了Java中io.reactivex.Observable.cache()方法的一些代码示例,展示了Observable.cache()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.cache()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:cache

Observable.cache介绍

[英]Returns an Observable that subscribes to this ObservableSource lazily, caches all of its events and replays them, in the same order as received, to all the downstream subscribers.

This is useful when you want an ObservableSource to cache responses and you can't control the subscribe/dispose behavior of all the Observers.

The operator subscribes only when the first downstream subscriber subscribes and maintains a single subscription towards this ObservableSource. In contrast, the operator family of #replay()that return a ConnectableObservable require an explicit call to ConnectableObservable#connect().

Note: You sacrifice the ability to dispose the origin when you use the cacheObserver so be careful not to use this Observer on ObservableSources that emit an infinite or very large number of items that will use up memory. A possible workaround is to apply takeUntil with a predicate or another source before (and perhaps after) the application of cache().

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.subscribe(...);

Since the operator doesn't allow clearing the cached values either, the possible workaround is to forget all references to it via #onTerminateDetach() applied along with the previous workaround:

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.subscribe(...);

Scheduler: cache does not operate by default on a particular Scheduler.
[中]返回延迟订阅此ObservableSource的Observable,缓存其所有事件,并按照接收到的相同顺序将其重播给所有下游订阅方。
当您希望ObservateSource缓存响应,并且无法控制所有观察者的订阅/处置行为时,这非常有用。
只有当第一个下游订户订阅并维护对该可观察资源的单一订阅时,运营商才会订阅。相反,返回ConnectableObservable的#replay()运算符族需要显式调用ConnectableObservable#connect()。
*注意:*当您使用cacheObserver时,您牺牲了处理原点的能力,因此请小心不要在发出无限或非常大量项的Observesource上使用此Observator,这将耗尽内存。一种可能的解决方法是在应用cache()之前(可能之后)使用谓词或其他源应用“takeUntil”。

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.subscribe(...);

由于操作员也不允许清除缓存的值,因此可能的解决方法是通过与前面的解决方法一起应用的#onTerminateDetach()忘记对它的所有引用:

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.subscribe(...);

调度程序:缓存默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: commonsguy/cw-omnibus

@Override
public void onActivityResult(int requestCode, int resultCode,
               Intent resultData) {
 if (resultCode==Activity.RESULT_OK) {
  docObservable=Observable
   .defer(() -> (Observable.just(createDurableContent(resultData))))
   .subscribeOn(Schedulers.io())
   .cache()
   .observeOn(AndroidSchedulers.mainThread());
  docSub();
 }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeSource() throws Exception {
  Action unsubscribe = mock(Action.class);
  Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).cache();
  o.subscribe();
  o.subscribe();
  o.subscribe();
  verify(unsubscribe, never()).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public Observable<Integer> apply(final Integer i) {
  return Observable.unsafeCreate(new ObservableSource<Integer>() {
    @Override
    public void subscribe(Observer<? super Integer> observer) {
      observer.onSubscribe(Disposables.empty());
      if (i < 500) {
        try {
          Thread.sleep(1);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      observer.onNext(i);
      observer.onComplete();
    }
  }).subscribeOn(Schedulers.computation()).cache();
}

代码示例来源:origin: commonsguy/cw-omnibus

.cache()
.observeOn(AndroidSchedulers.mainThread());

代码示例来源:origin: commonsguy/cw-omnibus

@Override
public void onCreate(Bundle savedInstanceState) {
 super.onCreate(savedInstanceState);
 setContentView(R.layout.main);
 final ViewPager pager=findViewById(R.id.pager);
 observable=(Observable<PermissionRoster>)getLastCustomNonConfigurationInstance();
 if (observable==null) {
  observable=Observable
   .create(new PermissionSource(this))
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .cache();
 }
 sub=observable.subscribe(new Consumer<PermissionRoster>() {
   @Override
   public void accept(PermissionRoster roster) throws Exception {
    pager.setAdapter(new PermissionTabAdapter(MainActivity.this,
     getSupportFragmentManager(), roster));
   }
  }, new Consumer<Throwable>() {
   @Override
   public void accept(Throwable error) throws Exception {
    Toast
     .makeText(MainActivity.this, error.getMessage(), Toast.LENGTH_LONG)
     .show();
    Log.e(getClass().getSimpleName(), "Exception processing request",
     error);
   }
  });
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Make sure emission-subscription races are handled correctly.
 * https://github.com/ReactiveX/RxJava/issues/1147
 */
@Test
public void testRaceForTerminalState() {
  final List<Integer> expected = Arrays.asList(1);
  for (int i = 0; i < 100000; i++) {
    TestObserver<Integer> to = new TestObserver<Integer>();
    Observable.just(1).subscribeOn(Schedulers.computation()).cache().subscribe(to);
    to.awaitTerminalEvent();
    to.assertValueSequence(expected);
    to.assertTerminated();
  }
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Make sure emission-subscription races are handled correctly.
 * https://github.com/ReactiveX/RxJava/issues/1147
 */
@Test
public void testRaceForTerminalState() {
  final List<Integer> expected = Arrays.asList(1);
  for (int i = 0; i < 100000; i++) {
    TestObserver<Integer> to = new TestObserver<Integer>();
    Observable.just(1).subscribeOn(Schedulers.computation()).cache().subscribe(to);
    to.awaitTerminalEvent();
    to.assertValueSequence(expected);
    to.assertTerminated();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.range(1, 5).cache());
}

代码示例来源:origin: ReactiveX/RxJava

}).cache();

代码示例来源:origin: ReactiveX/RxJava

}).cache();

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposeOnArrival() {
  Observable.range(1, 5).cache()
  .test(true)
  .assertEmpty();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void cancelledUpFront() {
    final AtomicInteger call = new AtomicInteger();
    Observable<Object> f = Observable.fromCallable(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return call.incrementAndGet();
      }
    }).concatWith(Observable.never())
    .cache();

    f.test().assertValuesOnly(1);

    f.test(true)
    .assertEmpty();

    assertEquals(1, call.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("2.x consumers are not allowed to throw")
public void unsafeChildThrows() {
  final AtomicInteger count = new AtomicInteger();
  Observable<Integer> source = Observable.range(1, 100)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer t) {
      count.getAndIncrement();
    }
  })
  .cache();
  TestObserver<Integer> to = new TestObserver<Integer>() {
    @Override
    public void onNext(Integer t) {
      throw new TestException();
    }
  };
  source.subscribe(to);
  Assert.assertEquals(100, count.get());
  to.assertNoValues();
  to.assertNotComplete();
  to.assertError(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testNoMissingBackpressureException() {
  final int m = 4 * 1000 * 1000;
  Observable<Integer> firehose = Observable.unsafeCreate(new ObservableSource<Integer>() {
    @Override
    public void subscribe(Observer<? super Integer> t) {
      t.onSubscribe(Disposables.empty());
      for (int i = 0; i < m; i++) {
        t.onNext(i);
      }
      t.onComplete();
    }
  });
  TestObserver<Integer> to = new TestObserver<Integer>();
  firehose.cache().observeOn(Schedulers.computation()).takeLast(100).subscribe(to);
  to.awaitTerminalEvent(3, TimeUnit.SECONDS);
  to.assertNoErrors();
  to.assertComplete();
  assertEquals(100, to.values().size());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Observable<Integer> cache = Observable.range(1, 5).cache();
  cache.take(2).test().assertResult(1, 2);
  cache.take(3).test().assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testValuesAndThenError() {
  Observable<Integer> source = Observable.range(1, 10)
      .concatWith(Observable.<Integer>error(new TestException()))
      .cache();
  TestObserver<Integer> to = new TestObserver<Integer>();
  source.subscribe(to);
  to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  to.assertNotComplete();
  to.assertError(TestException.class);
  TestObserver<Integer> to2 = new TestObserver<Integer>();
  source.subscribe(to2);
  to2.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  to2.assertNotComplete();
  to2.assertError(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void observers() {
  PublishSubject<Integer> ps = PublishSubject.create();
  ObservableCache<Integer> cache = (ObservableCache<Integer>)Observable.range(1, 5).concatWith(ps).cache();
  assertFalse(cache.hasObservers());
  assertEquals(0, cache.cachedEventCount());
  TestObserver<Integer> to = cache.test();
  assertTrue(cache.hasObservers());
  assertEquals(5, cache.cachedEventCount());
  ps.onComplete();
  to.assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: com.github.davidmoten/rxjava2-extras

public void subscribe(final Observer<? super T> observer) {
  if (refresh.compareAndSet(true, false)) {
    current = source.cache();
  }
  current.subscribe(observer);
}

代码示例来源:origin: davidmoten/rxjava2-extras

public void subscribe(final Observer<? super T> observer) {
  if (refresh.compareAndSet(true, false)) {
    current = source.cache();
  }
  current.subscribe(observer);
}

代码示例来源:origin: com.firstdata.clovergo/domain

public Observable<EmployeeMerchant> getCachedObservable() {
    repository.log("get sdk merchants info");
      if (cachedObservable == null) {
        return repository.getSDKMerchantsInfo().doOnNext(employeeMerchant -> repository.storeInCache(UseCaseConstants.EMPLOYEE_MERCHANT, employeeMerchant)).doOnError(throwable -> {
          Employee employee = Employee.createEmployee(null);
          Merchant merchant = Merchant.createMerchant(null);
          repository.storeInCache(UseCaseConstants.EMPLOYEE_MERCHANT, EmployeeMerchant.createEmployeeMerchant(employee,merchant,true));
          cachedObservable = null;
        }).cache();
      }
      return cachedObservable;
  }
}

相关文章

Observable类方法