rx.Observable.range()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.0k)|赞(0)|评价(0)|浏览(123)

本文整理了Java中rx.Observable.range()方法的一些代码示例,展示了Observable.range()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.range()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:range

Observable.range介绍

[英]Returns an Observable that emits a sequence of Integers within a specified range.

Scheduler: range does not operate by default on a particular Scheduler.
[中]返回在指定范围内发出整数序列的可观测值。
调度程序:默认情况下,范围不会在特定调度程序上运行。

代码示例

代码示例来源:origin: konmik/nucleus

private Observable<String> requestPage(int pageNumber, int pageSize) {
  return Observable.range(pageNumber * pageSize, pageSize).map(new Func1<Integer, String>() {
    @Override
    public String call(Integer integer) {
      return integer.toString();
    }
  });
}

代码示例来源:origin: bumptech/glide

Observable<List<Image>> getHotViralImages(@SuppressWarnings("SameParameterValue") int maxPages) {
 return Observable.range(0, maxPages)
   .flatMap(new Func1<Integer, Observable<List<Image>>>() {
    @Override

代码示例来源:origin: apache/usergrid

@Test()
public void testSequence3(){
  ArrayList listReturn =  Observable.range(0, 2)
    .collect(()->new ArrayList(),(list,i) ->{
      list.add(i);
    }).toBlocking().first();
  Assert.assertEquals(listReturn, Observable.range(0, 2).toList().toBlocking().last());
}

代码示例来源:origin: apache/usergrid

@Test( expected = TestException.class )
public void throwOnBlockingLast() {
  Observable.range( 0, 1 ).map( integer -> {
    throw new TestException( "I throw and exception" );
  } ).toBlocking().last();
}

代码示例来源:origin: apache/usergrid

public Observable<Entity> createStreamFromWorkers(  final SearchEdge indexEdge,
                          final String uniqueIdentifier ) {
  //create a sequence of observables.  Each index will be it's own worker thread using the Schedulers.newthread()
  return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).flatMap(
    integer -> createWriteObservable(  indexEdge, uniqueIdentifier, integer )
      .subscribeOn( Schedulers.newThread() ) );
}

代码示例来源:origin: apache/usergrid

@Test( expected = TestException.class )
public void throwOnBlockingFirst() {
  Observable.range( 0, 1 ).map( integer -> {
    throw new TestException( "I throw and exception" );
  } ).toBlocking().first();
}

代码示例来源:origin: apache/usergrid

@Test()
public void testSequence(){
  ArrayList listReturn =  Observable.range(0, 1).flatMap(i -> Observable.empty())
    .collect(()->new ArrayList(),(list,i) ->{
      list.add(i);
    }).toBlocking().lastOrDefault(null);
  Assert.assertEquals(listReturn,new ArrayList<Integer>());
}

代码示例来源:origin: apache/usergrid

@Test()
public void testSequence2(){
  ArrayList listReturn =  Observable.range(0, 2).buffer(2).flatMap(i -> Observable.empty())
    .collect(()->new ArrayList(),(list,i) ->{
      list.add(i);
    }).toBlocking().lastOrDefault(null);
  Assert.assertEquals(listReturn,new ArrayList<Integer>());
}

代码示例来源:origin: apache/usergrid

/**
 * Tests that reduce emits
 */
@Test
public void testReduceEmpty(){
  final int result =  Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
  assertEquals(0, result);
}

代码示例来源:origin: apache/usergrid

/**
 *  Tests working with observers
 */
@Test( expected = TestException.class )
public void throwOnSubscribeObservable() {
  final ReThrowObserver exceptionObserver = new ReThrowObserver();
  Observable.range( 0, 1 ).map( integer -> {
    throw new TestException( "I throw and exception" );
  } ).subscribe( exceptionObserver );
  exceptionObserver.checkResult();
}

代码示例来源:origin: apache/usergrid

@Test
@Category(ExperimentalTest.class )
public void testPublish() throws InterruptedException {
  final int count = 10;
  final CountDownLatch latch = new CountDownLatch( count+1 );
  final Subscription connectedObservable =
    Observable.range( 0, count )
      .doOnNext( integer -> latch.countDown() )
      .doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() )
      .subscribe();
  final boolean completed = latch.await( 3, TimeUnit.SECONDS );
  assertTrue( "publish1 behaves as expected", completed );
  final boolean completedSubscription = connectedObservable.isUnsubscribed();
  assertTrue( "Subscription complete", completedSubscription );
}

代码示例来源:origin: ReactiveX/RxNetty

@Test(timeout = 60000)
public void testScheduleNow() throws Exception {
  RxJavaEventloopScheduler scheduler = new RxJavaEventloopScheduler(new NioEventLoopGroup());
  TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
  Observable.range(1, 1)
       .observeOn(scheduler)
       .subscribe(testSubscriber);
  testSubscriber.awaitTerminalEvent();
  testSubscriber.assertNoErrors();
  testSubscriber.assertValue(1);
}

代码示例来源:origin: apache/usergrid

private List<MarkedEdge> createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager,
                           final int edgeCount ) {
  final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
    //create our connection edge.
    final Id connectingId = createId( "connecting" );
    final Edge connectionEdge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
    return graphManager.writeEdge( connectionEdge ).subscribeOn( Schedulers.io() );
  }, 20).toList().toBlocking().last();
  assertEquals( "All edges saved", edgeCount, connectionSearchEdges.size() );
  return connectionSearchEdges;
}

代码示例来源:origin: Netflix/conductor

@Test
public void test() {
  List<Message> messages = new LinkedList<>();
  Observable.range(0, 10).forEach((Integer x) -> messages.add(new Message("" + x, "payload: " + x, null)));
  assertEquals(10, messages.size());
  SQSObservableQueue queue = mock(SQSObservableQueue.class);
  when(queue.getOrCreateQueue()).thenReturn("junit_queue_url");
  Answer<?> answer = (Answer<List<Message>>) invocation -> Collections.emptyList();
  when(queue.receiveMessages()).thenReturn(messages).thenAnswer(answer);
  when(queue.getOnSubscribe()).thenCallRealMethod();
  when(queue.observe()).thenCallRealMethod();
  List<Message> found = new LinkedList<>();
  Observable<Message> observable = queue.observe();
  assertNotNull(observable);
  observable.subscribe(found::add);
  Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
  assertEquals(messages.size(), found.size());
  assertEquals(messages, found);
}

代码示例来源:origin: apache/usergrid

@Test
@Category(ExperimentalTest.class )
public void testConnectableObserver() throws InterruptedException {
  final int count = 10;
  final CountDownLatch latch = new CountDownLatch( count );
  final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
  //connect to our latch, which should run on it's own subscription
  //start our latch running
  connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
  final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
  //start the sequence
  connectedObservable.connect();
  final boolean completed = latch.await( 5, TimeUnit.SECONDS );
  assertTrue( "publish1 behaves as expected", completed );
  final int returnedCount = countObservable.toBlocking().last();
  assertEquals( "Counts the same", count, returnedCount );
}

代码示例来源:origin: apache/usergrid

/**
 *  Tests working with observers
 */
@Test( expected = TestException.class )
public void throwOnSubscribeObservableNewThread() throws Exception {
  final ReThrowObserver exceptionObserver = new ReThrowObserver();
  Observable.range( 0, 1 ).map(integer -> {
    throw new TestException("I throw and exception");
  })
    .doOnError(t -> exceptionObserver.onError(t))
    .subscribeOn(Schedulers.newThread())
    .subscribe(exceptionObserver);
  for(int i =0; i<5; i++) {
    exceptionObserver.checkResult();
    Thread.sleep(200);
  }
}

代码示例来源:origin: konmik/nucleus

.startWith(Observable.range(0, requestedPageCount))
.concatMap(new Func1<Integer, Observable<String>>() {
  @Override

代码示例来源:origin: apache/usergrid

final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
  final Id connectingId = createId( "connecting" );
  final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );

代码示例来源:origin: apache/usergrid

final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
  final Id connectingId = createId("connecting");
  final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId());

代码示例来源:origin: apache/usergrid

return Observable.range( 0, indexTestFig.getNumberOfRecords() )

相关文章

Observable类方法