rx.Observable.toList()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.9k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中rx.Observable.toList()方法的一些代码示例,展示了Observable.toList()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.toList()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:toList

Observable.toList介绍

[英]Returns an Observable that emits a single item, a list composed of all the items emitted by the source Observable.

Normally, an Observable that returns multiple items will do so by invoking its Observer's Observer#onNext method for each such item. You can change this behavior, instructing the Observable to compose a list of all of these items and then to invoke the Observer's onNextfunction once, passing it the entire list, by calling the Observable's toList method prior to calling its #subscribe method.

Be careful not to use this operator on Observables that emit infinite or very large numbers of items, as you do not have the option to unsubscribe. Backpressure Support: This operator does not support backpressure as by intent it is requesting and buffering everything. Scheduler: toList does not operate by default on a particular Scheduler.
[中]返回发出单个项的可观察项,该列表由源可观察项发出的所有项组成。
通常,返回多个项目的Observable将通过为每个此类项目调用其Observator的Observator#onNext方法来实现。你可以改变这种行为,在调用被观察者的#subscribe方法之前调用其toList方法,指示被观察者组成一个包含所有这些项的列表,然后调用观察者的onNextfunction一次,将整个列表传递给它。
注意不要在发射无限或非常大量项目的观测值上使用此运算符,因为您没有取消订阅的选项。背压支持:该操作员无意支持背压,因为它正在请求和缓冲所有内容。Scheduler:toList默认情况下不会在特定的计划程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

@Override
  public Observable<List<CachedValuesHistogram>> call(Observable<CachedValuesHistogram> windowOf2) {
    return windowOf2.toList();
  }
};

代码示例来源:origin: grandcentrix/tray

private List<String> getNiceString(final Collection<TrayItem> items) {
  return Observable.from(items)
      .map(new Func1<TrayItem, String>() {
        @Override
        public String call(final TrayItem trayItem) {
          return "key: '" + trayItem.key() + "' value '" + trayItem.value() + "'";
        }
      })
      .toList().toBlocking().first();
}

代码示例来源:origin: spring-projects/spring-framework

@PostMapping("/observable")
public Observable<Void> createWithObservable(@RequestBody Observable<Person> observable) {
  return observable.toList().doOnNext(persons::addAll).flatMap(document -> Observable.empty());
}

代码示例来源:origin: jooby-project/jooby

@SuppressWarnings("rawtypes")
static Route.Mapper mapper() {
 return Route.Mapper.create("mongo-rx", v -> {
  if (v instanceof FindObservable) {
   return ((FindObservable) v).toObservable().toList();
  } else if (v instanceof ListCollectionsObservable) {
   return ((ListCollectionsObservable) v).toObservable().toList();
  } else if (v instanceof ListDatabasesObservable) {
   return ((ListDatabasesObservable) v).toObservable().toList();
  } else if (v instanceof AggregateObservable) {
   return ((AggregateObservable) v).toObservable().toList();
  } else if (v instanceof DistinctObservable) {
   return ((DistinctObservable) v).toObservable().toList();
  } else if (v instanceof MapReduceObservable) {
   return ((MapReduceObservable) v).toObservable().toList();
  } else if (v instanceof MongoObservable) {
   return ((MongoObservable) v).toObservable();
  }
  return v;
 });
}

代码示例来源:origin: Rukey7/MvpApp

@Override
public void getData(boolean isRefresh) {
  mDbDao.queryBuilder().rx()
      .oneByOne()
      .filter(new Func1<VideoInfo, Boolean>() {
        @Override
        public Boolean call(VideoInfo info) {
          // 判断是否存于下载中
          return (info.getDownloadStatus() != DownloadStatus.NORMAL &&
              info.getDownloadStatus() != DownloadStatus.COMPLETE);
        }
      })
      .toList()
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Action1<List<VideoInfo>>() {
        @Override
        public void call(List<VideoInfo> videoList) {
          if (ListUtils.isEmpty(videoList)) {
            mView.noData();
          } else {
            mView.loadData(videoList);
          }
        }
      });
}

代码示例来源:origin: jooby-project/jooby

@Override
public <T> Observable<AsyncViewQueryResult<T>> query(final ViewQuery query) {
 return bucket.query(query)
   .map(result -> {
    Observable<List<T>> rows = result.rows()
      .flatMap(r -> r.document()
        .map(doc -> {
         EntityDocument<T> entity = converter.toEntity(doc, null);
         return entity.content();
        }))
      .toList();
    return new AsyncViewQueryResult(result.totalRows(), rows);
   });
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
  final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
  final EntityCollectionManager entityCollectionManager =
    entityCollectionManagerFactory.createCollectionManager( applicationScope );
  //it's more efficient to make 1 network hop to get everything, then drop our results if required
  final Observable<FilterResult<Entity>> entityObservable =
    filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
      if (logger.isTraceEnabled()) {
        logger.trace("Attempting to batch load ids {}", bufferedIds);
      }
      final Observable<EntitySet> entitySetObservable =
        Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
             .flatMap( ids -> entityCollectionManager.load( ids ) );
      //now we have a collection, validate our candidate set is correct.
      GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
      return entitySetObservable.map( entitySet -> new EntityVerifier( applicationScope, graphManager,
        entitySet, bufferedIds, readRepairFig ) )
                   .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
          entityCollector -> Observable.from( entityCollector.getResults() ) );
    } );
  return entityObservable;
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void publisherToRxObservable() {
  List<Integer> sequence = Arrays.asList(1, 2, 3);
  Publisher<Integer> source = Flowable.fromIterable(sequence);
  Object target = getAdapter(rx.Observable.class).fromPublisher(source);
  assertTrue(target instanceof rx.Observable);
  assertEquals(sequence, ((rx.Observable<?>) target).toList().toBlocking().first());
}

代码示例来源:origin: jooby-project/jooby

@Override
public <T> Observable<List<T>> query(final N1qlQuery query) {
 return bucket.query(query)
   .flatMap(aqr -> Observable.zip(aqr.rows().toList(),
     aqr.errors().toList(),
     aqr.finalSuccess().singleOrDefault(Boolean.FALSE),
     (rows, errors, finalSuccess) -> {
      if (!finalSuccess) {
       throw new QueryExecutionException(
         "execution of query resulted in exception: ",
         Try.apply(() -> errors.get(0)).orElse(null));
      }
      List<T> value = new ArrayList<>();
      for (AsyncN1qlQueryRow row : rows) {
       try {
        T v = converter.fromBytes(row.byteValue());
        value.add(v);
       } catch (IOException ex) {
        throw new QueryExecutionException(
          "execution of query resulted in exception", null, ex);
       }
      }
      return value;
     }));
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void observableTestBean() throws Exception {
  String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
  ResolvableType type = forClassWithGenerics(Observable.class, TestBean.class);
  MethodParameter param = this.testMethod.arg(type);
  Observable<?> observable = resolveValue(param, body);
  assertEquals(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")),
      observable.toList().toBlocking().first());
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * Run the command via {@link com.netflix.hystrix.HystrixCommand#observe()}, immediately block and then assert
 * @param command command to run
 * @param assertion assertions to check
 * @param isSuccess should the command succeed?
 */
protected void assertBlockingObserve(C command, Action1<C> assertion, boolean isSuccess) {
  System.out.println("Running command.observe(), immediately blocking and then running assertions...");
  if (isSuccess) {
    try {
      command.observe().toList().toBlocking().single();
    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  } else {
    try {
      command.observe().toList().toBlocking().single();
      fail("Expected a command failure!");
    } catch (Exception ex) {
      System.out.println("Received expected ex : " + ex);
      ex.printStackTrace();
    }
  }
  assertion.call(command);
}

代码示例来源:origin: PipelineAI/pipeline

o.toList().toBlocking().single();
} catch (Exception ex) {
  throw new RuntimeException(ex);
  o.toList().toBlocking().single();
  fail("Expected a command failure!");
} catch (Exception ex) {

代码示例来源:origin: apache/usergrid

@Override
public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
  /**
   * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
   * objects
   */
  final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
  final EntityCollectionManager entityCollectionManager =
    entityCollectionManagerFactory.createCollectionManager( applicationScope );
  final EntityIndex applicationIndex =
    entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
  final Observable<FilterResult<Id>> searchIdSetObservable =
    filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> {
        //flatten toa list of ids to load
        final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map(
          candidate -> candidate.getValue().getCandidateResult().getId() ).toList();
        //load the ids
        final Observable<VersionSet> versionSetObservable =
          candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
        //now we have a collection, validate our canidate set is correct.
        return versionSetObservable.map(
          entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet,
            candidateResults, indexProducer ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
          entityCollector -> Observable.from( entityCollector.collectResults() ) );
      } );
  return searchIdSetObservable;
}

代码示例来源:origin: apache/usergrid

private List<MarkedEdge> createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager,
                           final int edgeCount ) {
  final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
    //create our connection edge.
    final Id connectingId = createId( "connecting" );
    final Edge connectionEdge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
    return graphManager.writeEdge( connectionEdge ).subscribeOn( Schedulers.io() );
  }, 20).toList().toBlocking().last();
  assertEquals( "All edges saved", edgeCount, connectionSearchEdges.size() );
  return connectionSearchEdges;
}

代码示例来源:origin: apache/usergrid

@Test()
public void testSequence3(){
  ArrayList listReturn =  Observable.range(0, 2)
    .collect(()->new ArrayList(),(list,i) ->{
      list.add(i);
    }).toBlocking().first();
  Assert.assertEquals(listReturn, Observable.range(0, 2).toList().toBlocking().last());
}

代码示例来源:origin: PipelineAI/pipeline

List<String> results = overall.toList().toBlocking().first(); //wait for all commands to complete

代码示例来源:origin: PipelineAI/pipeline

final List<Boolean> blockingList = Observable.merge(results).toList().toBlocking().single();

代码示例来源:origin: apache/usergrid

@Test
public void testSingleConnection() {
  final ApplicationScope applicationScope = new ApplicationScopeImpl( new SimpleId( "application" ) );
  final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
  //now write a single connection
  final Id source = new SimpleId( "source" );
  //add to a collection
  final String collectionName = "testCollection";
  final Edge collectionEdge = CpNamingUtils.createCollectionEdge( applicationScope.getApplication(), collectionName, source );
  final Edge writtenCollection = gm.writeEdge( collectionEdge ).toBlocking().last();
  assertNotNull("Collection edge written", writtenCollection);
  final Id target = new SimpleId( "target" );
  final String connectionType = "testConnection";
  final Edge connectionEdge = CpNamingUtils.createConnectionEdge( source, connectionType, target );
  final Edge writtenConnection = gm.writeEdge( connectionEdge ).toBlocking().last();
  //now run the cleanup
  final int count =
    connectionService.deDupeConnections( Observable.just( applicationScope ) ).count().toBlocking().last();
  assertEquals( "No edges deleted", 0, count );
  //now ensure we can read the edge.
  final SearchByEdge simpleSearchByEdge =
    new SimpleSearchByEdge( source, connectionEdge.getType(), target, Long.MAX_VALUE,
      SearchByEdgeType.Order.DESCENDING, Optional.absent() );
  final List<MarkedEdge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
  assertEquals( 1, edges.size() );
  assertEquals( writtenConnection, edges.get( 0 ) );
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * Test support of multiple onNext events.
 */
@Test
public void testExecutionSuccessWithMultipleEvents() {
  try {
    TestCommandWithMultipleValues command = new TestCommandWithMultipleValues();
    assertEquals(Arrays.asList(true, false, true), command.observe().toList().toBlocking().single());
    assertEquals(null, command.getFailedExecutionException());
    assertTrue(command.getExecutionTimeInMilliseconds() > -1);
    assertTrue(command.isSuccessfulExecution());
    assertCommandExecutionEvents(command, HystrixEventType.EMIT, HystrixEventType.EMIT, HystrixEventType.EMIT, HystrixEventType.SUCCESS);
    assertEquals(0, command.metrics.getCurrentConcurrentExecutionCount());
    assertSaneHystrixRequestLog(1);
    // semaphore isolated
    assertFalse(command.isExecutedInThread());
  } catch (Exception e) {
    e.printStackTrace();
    fail("We received an exception.");
  }
}

代码示例来源:origin: PipelineAI/pipeline

Observable.merge(result1, result2).toList().toBlocking().single(); //await the 2 latent commands

相关文章

Observable类方法