io.reactivex.Observable.doOnComplete()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.8k)|赞(0)|评价(0)|浏览(160)

本文整理了Java中io.reactivex.Observable.doOnComplete()方法的一些代码示例,展示了Observable.doOnComplete()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnComplete()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnComplete

Observable.doOnComplete介绍

[英]Modifies the source ObservableSource so that it invokes an action when it calls onComplete.

Scheduler: doOnComplete does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便在调用onComplete时调用操作。
调度器:默认情况下,doOnComplete不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Observable<Integer> w) {
    return w.startWith(indicator)
        .doOnComplete(new Action() {
          @Override
          public void run() {
            System.out.println("inner done: " + wip.incrementAndGet());
          }
        })
        ;
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnCompleteNull() {
  just1.doOnComplete(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDoOnCompleted() {
  final AtomicBoolean r = new AtomicBoolean();
  String output = Observable.just("one").doOnComplete(new Action() {
    @Override
    public void run() {
      r.set(true);
    }
  }).blockingSingle();
  assertEquals("one", output);
  assertTrue(r.get());
}

代码示例来源:origin: ReactiveX/RxJava

private static <T> Observable<T> composer(Observable<T> source, final AtomicInteger subscriptionCount, final int m) {
  return source.doOnSubscribe(new Consumer<Disposable>() {
    @Override
    public void accept(Disposable d) {
        int n = subscriptionCount.getAndIncrement();
        if (n >= m) {
          Assert.fail("Too many subscriptions! " + (n + 1));
        }
    }
  }).doOnComplete(new Action() {
    @Override
    public void run() {
        int n = subscriptionCount.decrementAndGet();
        if (n < 0) {
          Assert.fail("Too many unsubscriptions! " + (n - 1));
        }
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteCrash() {
  Observable.wrap(new ObservableSource<Object>() {
    @Override
    public void subscribe(Observer<? super Object> observer) {
      observer.onSubscribe(Disposables.empty());
      observer.onComplete();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      throw new IOException();
    }
  })
  .test()
  .assertFailure(IOException.class);
}

代码示例来源:origin: ReactiveX/RxJava

.doOnNext(sourceNext)
.doOnDispose(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.doOnError(sourceError)
.subscribeOn(mockScheduler).replay();

代码示例来源:origin: ReactiveX/RxJava

.doOnNext(sourceNext)
.doOnDispose(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.replay();

代码示例来源:origin: ReactiveX/RxJava

.window(300, TimeUnit.MILLISECONDS)
.take(10)
.doOnComplete(new Action() {
  @Override
  public void run() {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteCrashConditional() {
  Observable.wrap(new ObservableSource<Object>() {
    @Override
    public void subscribe(Observer<? super Object> observer) {
      observer.onSubscribe(Disposables.empty());
      observer.onComplete();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      throw new IOException();
    }
  })
  .filter(Functions.alwaysTrue())
  .test()
  .assertFailure(IOException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDisposesEagerlyBeforeCompletion() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Action completion = createOnCompletedAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
    @Override
    public Observable<String> apply(Resource resource) {
      return Observable.fromArray(resource.getTextFromWeb().split(" "));
    }
  };
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> o = Observable.using(resourceFactory, observableFactory,
      new DisposeAction(), true)
  .doOnDispose(unsub)
  .doOnComplete(completion);
  o.safeSubscribe(observer);
  assertEquals(Arrays.asList("disposed", "completed" /* , "unsub" */), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Action completion = createOnCompletedAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
    @Override
    public Observable<String> apply(Resource resource) {
      return Observable.fromArray(resource.getTextFromWeb().split(" "));
    }
  };
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> o = Observable.using(resourceFactory, observableFactory,
      new DisposeAction(), false)
  .doOnDispose(unsub)
  .doOnComplete(completion);
  o.safeSubscribe(observer);
  assertEquals(Arrays.asList("completed", /*"unsub",*/ "disposed"), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedOnErrorCrash() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0 };
  Observable.range(1, 5)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[0]++;
    }
  })
  .subscribe(to);
  to.assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  .assertFailure(TestException.class);
  assertEquals(0, call[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fused() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0, 0 };
  Observable.range(1, 5)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      call[0]++;
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[1]++;
    }
  })
  .subscribe(to);
  to.assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, call[0]);
  assertEquals(1, call[1]);
}

代码示例来源:origin: ReactiveX/RxJava

ConnectableObservable<Integer> is = Observable.range(1, Flowable.bufferSize() * 2).publish();
Observable<Integer> fast = is.observeOn(Schedulers.computation())
.doOnComplete(new Action() {
  @Override
  public void run() {
}).doOnComplete(new Action() {

代码示例来源:origin: ReactiveX/RxJava

@Override
public Observable<String> apply(final GroupedObservable<Integer, Integer> group) {
  if (group.getKey() < 3) {
    return group.map(new Function<Integer, String>() {
      @Override
      public String apply(Integer t1) {
        return "first groups: " + t1;
      }
    })
        // must take(2) so an onComplete + unsubscribe happens on these first 2 groups
        .take(2).doOnComplete(new Action() {
          @Override
          public void run() {
            first.countDown();
          }
        });
  } else {
    return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
      @Override
      public String apply(Integer t1) {
        return "last group: " + t1;
      }
    }).doOnEach(new Consumer<Notification<String>>() {
      @Override
      public void accept(Notification<String> t1) {
        System.err.println("subscribeOn notification => " + t1);
      }
    });
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedAsync() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0, 0 };
  UnicastSubject<Integer> up = UnicastSubject.create();
  up
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      call[0]++;
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[1]++;
    }
  })
  .subscribe(to);
  TestHelper.emit(up, 1, 2, 3, 4, 5);
  to.assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, call[0]);
  assertEquals(1, call[1]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedOnErrorCrashConditional() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0 };
  Observable.range(1, 5)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[0]++;
    }
  })
  .filter(Functions.alwaysTrue())
  .subscribe(to);
  to.assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  .assertFailure(TestException.class);
  assertEquals(0, call[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedConditional() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0, 0 };
  Observable.range(1, 5)
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      call[0]++;
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[1]++;
    }
  })
  .filter(Functions.alwaysTrue())
  .subscribe(to);
  to.assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, call[0]);
  assertEquals(1, call[1]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedAsyncConditional() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0, 0 };
  UnicastSubject<Integer> up = UnicastSubject.create();
  up
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      call[0]++;
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[1]++;
    }
  })
  .filter(Functions.alwaysTrue())
  .subscribe(to);
  TestHelper.emit(up, 1, 2, 3, 4, 5);
  to.assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, call[0]);
  assertEquals(1, call[1]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Fusion not supported yet") // TODO decide/implement fusion
public void fusedAsyncConditional2() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  final int[] call = { 0, 0 };
  UnicastSubject<Integer> up = UnicastSubject.create();
  up.hide()
  .doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      call[0]++;
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      call[1]++;
    }
  })
  .filter(Functions.alwaysTrue())
  .subscribe(to);
  TestHelper.emit(up, 1, 2, 3, 4, 5);
  to.assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.NONE))
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, call[0]);
  assertEquals(1, call[1]);
}

相关文章

Observable类方法