本文整理了Java中io.reactivex.Observable.blockingIterable()
方法的一些代码示例,展示了Observable.blockingIterable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.blockingIterable()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:blockingIterable
[英]Converts this Observable into an Iterable.
Scheduler: blockingIterable does not operate by default on a particular Scheduler.
[中]将此可观察项转换为可观察项。
调度器:blockingIterable默认情况下不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Integer apply(Integer v) throws Exception {
Observable.just(1).delay(10, TimeUnit.SECONDS).blockingIterable().iterator().next();
return v;
}
})
代码示例来源:origin: ReactiveX/RxJava
Iterator<T> it = blockingIterable().iterator();
while (it.hasNext()) {
try {
代码示例来源:origin: ReactiveX/RxJava
/**
* Converts this {@code Observable} into an {@link Iterable}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingIterable.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingIterable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return an {@link Iterable} version of this {@code Observable}
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingIterable() {
return blockingIterable(bufferSize());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWhenMaxConcurrentIsOne() {
for (int i = 0; i < 100; i++) {
List<Observable<String>> os = new ArrayList<Observable<String>>();
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
Iterator<String> iter = Observable.merge(os, 1).blockingIterable().iterator();
List<String> actual = new ArrayList<String>();
while (iter.hasNext()) {
actual.add(iter.next());
}
assertEquals(expected, actual);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testRangeWithOverflow5() {
assertFalse(Observable.rangeLong(Long.MIN_VALUE, 0).blockingIterable().iterator().hasNext());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testRangeWithOverflow5() {
assertFalse(Observable.range(Integer.MIN_VALUE, 0).blockingIterable().iterator().hasNext());
}
代码示例来源:origin: redisson/redisson
/**
* Converts this {@code Observable} into an {@link Iterable}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingIterable.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingIterable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return an {@link Iterable} version of this {@code Observable}
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingIterable() {
return blockingIterable(bufferSize());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testToIterator() {
Observable<String> obs = Observable.just("one", "two", "three");
Iterator<String> it = obs.blockingIterable().iterator();
assertEquals(true, it.hasNext());
assertEquals("one", it.next());
assertEquals(true, it.hasNext());
assertEquals("two", it.next());
assertEquals(true, it.hasNext());
assertEquals("three", it.next());
assertEquals(false, it.hasNext());
}
代码示例来源:origin: redisson/redisson
Iterator<T> it = blockingIterable().iterator();
while (it.hasNext()) {
try {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMergeALotOfSourcesOneByOneSynchronously() {
int n = 10000;
List<Observable<Integer>> sourceList = new ArrayList<Observable<Integer>>(n);
for (int i = 0; i < n; i++) {
sourceList.add(Observable.just(i));
}
Iterator<Integer> it = Observable.merge(Observable.fromIterable(sourceList), 1).blockingIterable().iterator();
int j = 0;
while (it.hasNext()) {
assertEquals((Integer)j, it.next());
j++;
}
assertEquals(j, n);
}
代码示例来源:origin: ReactiveX/RxJava
@Ignore("subscribe() should not throw")
@Test(expected = TestException.class)
public void testExceptionThrownFromOnSubscribe() {
Iterable<String> strings = Observable.unsafeCreate(new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
throw new TestException("intentional");
}
}).blockingIterable();
for (String string : strings) {
// never reaches here
System.out.println(string);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMergeALotOfSourcesOneByOneSynchronouslyTakeHalf() {
int n = 10000;
List<Observable<Integer>> sourceList = new ArrayList<Observable<Integer>>(n);
for (int i = 0; i < n; i++) {
sourceList.add(Observable.just(i));
}
Iterator<Integer> it = Observable.merge(Observable.fromIterable(sourceList), 1).take(n / 2).blockingIterable().iterator();
int j = 0;
while (it.hasNext()) {
assertEquals((Integer)j, it.next());
j++;
}
assertEquals(j, n / 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = TestException.class)
public void testToIteratorWithException() {
Observable<String> obs = Observable.unsafeCreate(new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onSubscribe(Disposables.empty());
observer.onNext("one");
observer.onError(new TestException());
}
});
Iterator<String> it = obs.blockingIterable().iterator();
assertEquals(true, it.hasNext());
assertEquals("one", it.next());
assertEquals(true, it.hasNext());
it.next();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMaxConcurrent() {
for (int times = 0; times < 100; times++) {
int observableCount = 100;
// Test maxConcurrent from 2 to 12
int maxConcurrent = 2 + (times % 10);
AtomicInteger subscriptionCount = new AtomicInteger(0);
List<Observable<String>> os = new ArrayList<Observable<String>>();
List<SubscriptionCheckObservable> scos = new ArrayList<SubscriptionCheckObservable>();
for (int i = 0; i < observableCount; i++) {
SubscriptionCheckObservable sco = new SubscriptionCheckObservable(subscriptionCount, maxConcurrent);
scos.add(sco);
os.add(Observable.unsafeCreate(sco));
}
Iterator<String> iter = Observable.merge(os, maxConcurrent).blockingIterable().iterator();
List<String> actual = new ArrayList<String>();
while (iter.hasNext()) {
actual.add(iter.next());
}
// System.out.println("actual: " + actual);
assertEquals(5 * observableCount, actual.size());
for (SubscriptionCheckObservable sco : scos) {
assertFalse(sco.failed);
}
}
}
代码示例来源:origin: akarnokd/RxJava2Jdk8Interop
/**
* Returns a blocking Stream of the elements of the Observable.
* <p>
* Closing the Stream will cancel the flow.
* @param <T> the value type
* @return the Function to be used with {@code Observable.to}.
*/
public static <T> Function<Observable<T>, Stream<T>> toStream() {
return f -> ZeroOneIterator.toStream(f.blockingIterable().iterator());
}
代码示例来源:origin: PacktPublishing/Learning-RxJava
@Test
public void testIterable() {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Zeta");
Iterable<String> allWithLengthFive = source.filter(s ->
s.length() == 5)
.blockingIterable();
for (String s: allWithLengthFive) {
assertTrue(s.length() == 5);
}
}
}
代码示例来源:origin: xjdr/xio
@Test
public void testHttp1toHttp1ServerPostMany() throws Exception {
setupClient(NUM_REQUESTS, true);
setupFrontBack(false, false);
verify(multipleAsyncRequests(true).blockingIterable());
assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
assertProxiedRequests(NUM_REQUESTS * 2);
}
代码示例来源:origin: xjdr/xio
@Test
public void testHttp2toHttp1ServerGetMany() throws Exception {
setupClient(NUM_REQUESTS, true);
setupFrontBack(true, false);
verify(multipleAsyncRequests(false).blockingIterable());
assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
assertProxiedRequests(NUM_REQUESTS * 2);
}
代码示例来源:origin: xjdr/xio
@Test
public void testHttp1toHttp2ServerPostMany() throws Exception {
setupClient(NUM_REQUESTS, false);
setupFrontBack(false, true);
verify(multipleAsyncRequests(true).blockingIterable());
assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
assertProxiedRequests(NUM_REQUESTS * 2);
}
代码示例来源:origin: xjdr/xio
@Test
public void testHttp1toHttp1ServerGetMany() throws Exception {
setupClient(NUM_REQUESTS, true);
setupFrontBack(false, false);
verify(multipleAsyncRequests(false).blockingIterable());
assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
assertProxiedRequests(NUM_REQUESTS * 2);
}
内容来源于网络,如有侵权,请联系作者删除!