io.reactivex.Observable.window()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(110)

本文整理了Java中io.reactivex.Observable.window()方法的一些代码示例,展示了Observable.window()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.window()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:window

Observable.window介绍

[英]Returns an Observable that emits windows of items it collects from the source ObservableSource. The resulting ObservableSource emits connected, non-overlapping windows, each containing count items. When the source ObservableSource completes or encounters an error, the resulting ObservableSource emits the current window and propagates the notification from the source ObservableSource.

Scheduler: This version of window does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出它从源ObserveSource收集的项目窗口。由此产生的ObservableSource发出连接的、不重叠的窗口,每个窗口都包含计数项。当源ObservableSource完成或遇到错误时,生成的ObservableSource将发出当前窗口,并从源ObservableSource传播通知。
调度程序:默认情况下,此版本的Windows不会在特定的调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Observable<Object>> apply(Observable<Object> o) throws Exception {
    return o.window(1, 2);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Observable<Object>> apply(Observable<Object> o) throws Exception {
    return o.window(1);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void windowBoundarySupplierReturnsNull() {
  just1.window(new Callable<Observable<Object>>() {
    @Override
    public Observable<Object> call() {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Object> o) throws Exception {
    return o.window(Observable.never()).flatMap(new Function<Observable<Object>, ObservableSource<Object>>() {
      @Override
      public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
        return v;
      }
    });
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Observable<Object>> apply(Observable<Object> f)
      throws Exception {
    return f.window(Observable.never()).takeLast(1);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void windowOpenCloseCloseReturnsNull() {
  Observable.never().window(just1, new Function<Integer, Observable<Object>>() {
    @Override
    public Observable<Object> apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Object> o) throws Exception {
    return o.window(Functions.justCallable(Observable.never())).flatMap(new Function<Observable<Object>, ObservableSource<Object>>() {
      @Override
      public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
        return v;
      }
    });
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Observable<Object>> apply(Observable<Object> f)
      throws Exception {
    return f.window(Functions.justCallable(Observable.never())).takeLast(1);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipAndCountGaplessWindows() {
  Observable<String> subject = Observable.just("one", "two", "three", "four", "five");
  Observable<Observable<String>> windowed = subject.window(3, 3);
  List<List<String>> windows = toLists(windowed);
  assertEquals(2, windows.size());
  assertEquals(list("one", "two", "three"), windows.get(0));
  assertEquals(list("four", "five"), windows.get(1));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.range(1, 5).window(1, TimeUnit.DAYS, Schedulers.single()));
  TestHelper.checkDisposed(Observable.range(1, 5).window(2, 1, TimeUnit.DAYS, Schedulers.single()));
  TestHelper.checkDisposed(Observable.range(1, 5).window(1, 2, TimeUnit.DAYS, Schedulers.single()));
  TestHelper.checkDisposed(Observable.never()
      .window(1, TimeUnit.DAYS, Schedulers.single(), 2, true));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timespanTimeskipCustomScheduler() {
  Observable.just(1)
  .window(1, 1, TimeUnit.MINUTES, Schedulers.io())
  .flatMap(Functions.<Observable<Integer>>identity())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timespanTimeskipDefaultScheduler() {
  Observable.just(1)
  .window(1, 1, TimeUnit.MINUTES)
  .flatMap(Functions.<Observable<Integer>>identity())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timeskipJustOverlap() {
  Observable.just(1)
  .window(2, 1, TimeUnit.MINUTES, Schedulers.single())
  .flatMap(Functions.<Observable<Integer>>identity())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timespanTimeskipCustomSchedulerBufferSize() {
  Observable.range(1, 10)
  .window(1, 1, TimeUnit.MINUTES, Schedulers.io(), 2)
  .flatMap(Functions.<Observable<Integer>>identity())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void errorOverlap() {
  Observable.error(new TestException())
  .window(2, 1)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void errorExact() {
  Observable.error(new TestException())
  .window(1)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void errorSkip() {
  Observable.error(new TestException())
  .window(1, 2)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainError() {
  Observable.error(new TestException())
  .window(Functions.justCallable(Observable.never()))
  .test()
  .assertError(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void boundaryOnError() {
  TestObserver<Object> to = Observable.error(new TestException())
  .window(Observable.never())
  .flatMap(Functions.<Observable<Object>>identity(), true)
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void exactBoundaryError() {
  Observable.error(new TestException())
  .window(1, TimeUnit.DAYS, Schedulers.single(), 2, true)
  .test()
  .assertSubscribed()
  .assertError(TestException.class)
  .assertNotComplete();
}

相关文章

Observable类方法