io.reactivex.Observable.subscribeOn()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(197)

本文整理了Java中io.reactivex.Observable.subscribeOn()方法的一些代码示例,展示了Observable.subscribeOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.subscribeOn()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:subscribeOn

Observable.subscribeOn介绍

[英]Asynchronously subscribes Observers to this ObservableSource on the specified Scheduler.

Scheduler: You specify which Scheduler this operator will use.
[中]在指定的计划程序上异步订阅此ObservableSource的观察者。
调度器:指定该操作员将使用的调度器。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Integer t) {
    Observable<Integer> o = Observable.just(t)
        .subscribeOn(sch)
    ;
    Subject<Integer> subject = UnicastSubject.create();
    o.subscribe(subject);
    return subject;
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Integer t) {
    return Observable.just(1).subscribeOn(Schedulers.computation());
  }
}).subscribe(to);

代码示例来源:origin: ReactiveX/RxJava

@Override
public Observable<String> apply(Throwable t1) {
  return Observable.just("twoResume", "threeResume").subscribeOn(Schedulers.computation());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void subscribeOnNull() {
  just1.subscribeOn(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Integer t1) {
    return composer(Observable.range(t1 * 10, 2), subscriptionCount, m)
        .subscribeOn(Schedulers.computation());
  }
}, new BiFunction<Integer, Integer, Integer>() {

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Integer t1) {
    return composer(Observable.range(t1 * 10, 2), subscriptionCount, m)
        .subscribeOn(Schedulers.computation());
  }
}, m);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Integer t) {
    return Observable.range(1, 1000).subscribeOn(Schedulers.computation());
  }
}).observeOn(Schedulers.newThread()).subscribe(to);

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
  public void testTake() throws Exception {
    List<Observable<Integer>> sourceList = new ArrayList<Observable<Integer>>(3);

    sourceList.add(Observable.range(0, 100000).subscribeOn(Schedulers.io()));
    sourceList.add(Observable.range(0, 100000).subscribeOn(Schedulers.io()));
    sourceList.add(Observable.range(0, 100000).subscribeOn(Schedulers.io()));

    TestObserver<Integer> to = new TestObserver<Integer>();

    Observable.merge(sourceList, 2).take(5).subscribe(to);

    to.awaitTerminalEvent();
    to.assertNoErrors();
    to.assertValueCount(5);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public Observable<Integer> apply(Integer i) {
  return Observable.range(1, innerSize).subscribeOn(Schedulers.computation());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void dispose() {
    TestHelper.checkDisposed(Observable.just(1).subscribeOn(Schedulers.single()));
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(/* timeout = 1000, */expected = RuntimeException.class)
public void testHasNextThrows() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  it.hasNext();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000, expected = RuntimeException.class)
public void testNextThrows() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  it.next();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public CompletableSource apply(Integer v) throws Exception {
    return Observable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
  }
}).toObservable()

代码示例来源:origin: ReactiveX/RxJava

@Override
  public CompletableSource apply(Integer v) throws Exception {
    return Observable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test
public void blockingSubscribeConsumerConsumer() {
  final List<Object> list = new ArrayList<Object>();
  Observable.range(1, 5)
  .subscribeOn(Schedulers.computation())
  .blockingSubscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer v) throws Exception {
      list.add(v);
    }
  }, Functions.emptyConsumer());
  assertEquals(Arrays.asList(1, 2, 3, 4, 5), list);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void blockingFirstDefault() {
  assertEquals(1, Observable.<Integer>empty()
      .subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void blockingFirst() {
  assertEquals(1, Observable.range(1, 10)
      .subscribeOn(Schedulers.computation()).blockingFirst().intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 20000)
public void testNoStackOverFlow() {
  Observable.just(1).repeat().subscribeOn(Schedulers.newThread()).take(100000).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatTake() {
  Observable<Integer> xs = Observable.just(1, 2);
  Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
  assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnError() {
  TestObserver<String> to = new TestObserver<String>();
  Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      observer.onSubscribe(Disposables.empty());
      observer.onError(new RuntimeException("fail"));
    }
  }).subscribeOn(Schedulers.computation()).subscribe(to);
  to.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
  to.assertTerminated();
}

相关文章

Observable类方法