io.reactivex.Observable.doOnDispose()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(166)

本文整理了Java中io.reactivex.Observable.doOnDispose()方法的一些代码示例,展示了Observable.doOnDispose()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnDispose()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnDispose

Observable.doOnDispose介绍

[英]Calls the dispose Action if the downstream disposes the sequence.

The action is shared between subscriptions and thus may be called concurrently from multiple threads; the action must be thread safe.

If the action throws a runtime exception, that exception is rethrown by the dispose() call, sometimes as a CompositeException if there were multiple exceptions along the way.

Scheduler: doOnDispose does not operate by default on a particular Scheduler.
[中]如果下游处理序列,则调用dispose操作。
该操作在订阅之间共享,因此可以从多个线程同时调用;操作必须是线程安全的。
如果该操作引发运行时异常,则该异常将由dispose()调用重新引发,如果在此过程中出现多个异常,有时会作为CompositeException。
调度器:默认情况下,doOnDispose不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Integer> apply(Integer v) throws Exception {
    return Observable.just(v).doOnDispose(new Action() {
      @Override
      public void run() throws Exception {
        counter.getAndIncrement();
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnDisposeNull() {
  just1.doOnDispose(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeSource() throws Exception {
  Action unsubscribe = mock(Action.class);
  Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).cache();
  o.subscribe();
  o.subscribe();
  o.subscribe();
  verify(unsubscribe, never()).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeSource() throws Exception {
  Action unsubscribe = mock(Action.class);
  Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).replay().autoConnect();
  o.subscribe();
  o.subscribe();
  o.subscribe();
  verify(unsubscribe, never()).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void onNext(Integer t) {
    if (valueCount() == 2) {
      source.doOnDispose(new Action() {
        @Override
        public void run() {
          child2Unsubscribed.set(true);
        }
      }).take(5).subscribe(to2);
    }
    super.onNext(t);
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribesFromUpstream() {
  final AtomicBoolean unsubscribed = new AtomicBoolean(false);
  Action unsubscribeAction = new Action() {
    @Override
    public void run() {
      unsubscribed.set(true);
    }
  };
  Observable.just(1)
  .concatWith(Observable.<Integer>never())
  .doOnDispose(unsubscribeAction)
  .takeLast(1)
  .subscribe()
  .dispose();
  assertTrue(unsubscribed.get());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
  @Test
  public void noCancelPreviousIterable() {
    final AtomicInteger counter = new AtomicInteger();

    Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
      @Override
      public void run() throws Exception {
        counter.getAndIncrement();
      }
    });

    Observable.concat(Arrays.asList(source, source, source, source, source))
    .test()
    .assertResult(1, 1, 1, 1, 1);

    assertEquals(0, counter.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRepeat() {
  final AtomicInteger counter = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  source.repeat(5)
  .test()
  .assertResult(1, 1, 1, 1, 1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noCancelPreviousArray() {
  final AtomicInteger counter = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  Observable.concatArray(source, source, source, source, source)
  .test()
  .assertResult(1, 1, 1, 1, 1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRepeatUntil() {
  final AtomicInteger counter = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  final AtomicInteger times = new AtomicInteger();
  source.repeatUntil(new BooleanSupplier() {
    @Override
    public boolean getAsBoolean() throws Exception {
      return times.getAndIncrement() == 4;
    }
  })
  .test()
  .assertResult(1, 1, 1, 1, 1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribesFromUpstreamObservable() {
  final AtomicBoolean unsub = new AtomicBoolean();
  Observable.range(1, 10).concatWith(Observable.<Integer>never())
  .doOnDispose(new Action() {
    @Override
    public void run() {
      unsub.set(true);
    }})
    .ignoreElements()
    .toObservable()
    .subscribe()
    .dispose();
  assertTrue(unsub.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribesFromUpstream() {
  final AtomicBoolean unsub = new AtomicBoolean();
  Observable.range(1, 10).concatWith(Observable.<Integer>never())
  .doOnDispose(new Action() {
    @Override
    public void run() {
      unsub.set(true);
    }})
    .ignoreElements()
    .subscribe()
    .dispose();
  assertTrue(unsub.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() throws Exception {
  Action onCancel = mock(Action.class);
  Observable.range(1, 5)
  .doOnDispose(onCancel)
  .throttleLatest(1, TimeUnit.MINUTES)
  .take(1)
  .test()
  .assertResult(1);
  verify(onCancel).run();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelSkip() {
  final AtomicInteger counter = new AtomicInteger();
  Observable.<Integer>empty()
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  })
  .buffer(5, 10, TimeUnit.SECONDS)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(Collections.<Integer>emptyList());
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelOverlap() {
  final AtomicInteger counter = new AtomicInteger();
  Observable.<Integer>empty()
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  })
  .buffer(10, 5, TimeUnit.SECONDS)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(Collections.<Integer>emptyList());
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelExact() {
  final AtomicInteger counter = new AtomicInteger();
  Observable.<Integer>empty()
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  })
  .buffer(5, TimeUnit.SECONDS)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(Collections.<Integer>emptyList());
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normal() {
  final int[] calls = { 0 };
  Observable.just(1)
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      calls[0]++;
    }
  })
  .unsubscribeOn(Schedulers.single())
  .test()
  .assertResult(1);
  assertEquals(0, calls[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDisposesEagerlyBeforeCompletion() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Action completion = createOnCompletedAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
    @Override
    public Observable<String> apply(Resource resource) {
      return Observable.fromArray(resource.getTextFromWeb().split(" "));
    }
  };
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> o = Observable.using(resourceFactory, observableFactory,
      new DisposeAction(), true)
  .doOnDispose(unsub)
  .doOnComplete(completion);
  o.safeSubscribe(observer);
  assertEquals(Arrays.asList("disposed", "completed" /* , "unsub" */), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Action completion = createOnCompletedAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
    @Override
    public Observable<String> apply(Resource resource) {
      return Observable.fromArray(resource.getTextFromWeb().split(" "));
    }
  };
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> o = Observable.using(resourceFactory, observableFactory,
      new DisposeAction(), false)
  .doOnDispose(unsub)
  .doOnComplete(completion);
  o.safeSubscribe(observer);
  assertEquals(Arrays.asList("completed", /*"unsub",*/ "disposed"), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  final int[] calls = { 0 };
  Observable.error(new TestException())
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      calls[0]++;
    }
  })
  .unsubscribeOn(Schedulers.single())
  .test()
  .assertFailure(TestException.class);
  assertEquals(0, calls[0]);
}

相关文章

Observable类方法