rx.Observable.collect()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(106)

本文整理了Java中rx.Observable.collect()方法的一些代码示例,展示了Observable.collect()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.collect()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:collect

Observable.collect介绍

[英]Collects items emitted by the source Observable into a single mutable data structure and returns an Observable that emits this structure.

This is a simplified version of reduce that does not need to return the state on each pass. Backpressure Support: This operator does not support backpressure because by intent it will receive all values and reduce them to a single onNext. Scheduler: collect does not operate by default on a particular Scheduler.
[中]将源可观测项发出的项收集到单个可变数据结构中,并返回发出此结构的可观测项。
这是reduce的简化版本,不需要在每次传递时返回状态。背压支持:此运算符不支持背压,因为它会接收所有值,并将它们减少到单个onNext。调度程序:默认情况下,collect不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxNetty

@Override
public Observable<ByteBuf> call(Observable<ByteBuf> upstream) {
  return upstream
    .collect(
      new Func0<CompositeByteBuf>() {
        @Override
        public CompositeByteBuf call() {
          return Unpooled.compositeBuffer();
        }
      },
      new Action2<CompositeByteBuf, ByteBuf>() {
        @Override
        public void call(CompositeByteBuf collector, ByteBuf buf) {
          long newLength = collector.readableBytes() + buf.readableBytes();
          if (newLength <= maxBytes) {
            collector.addComponent(true, buf);
          } else {
            collector.release();
            buf.release();
            throw new TooMuchDataException("More than " + maxBytes + "B received");
          }
        }
      }
    )
    .cast(ByteBuf.class);
}

代码示例来源:origin: apache/usergrid

@Override
public Set<String> getConnectionsAsSource( final EntityRef entityRef ) {
  Preconditions.checkNotNull(entityRef, "entityRef cannot be null");
  final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
  final SearchEdgeType searchByEdgeType = createConnectionTypeSearch( entityRef.asId() );
  return graphManager.getEdgeTypesFromSource(
    searchByEdgeType ).map( edgeName -> getConnectionNameFromEdgeName( edgeName ) )
      .collect( () -> new HashSet<String>(), ( r, s ) -> r.add( s ) ).toBlocking().last();
}

代码示例来源:origin: apache/usergrid

@Override
public Set<String> getConnectionsAsTarget( final EntityRef entityRef ) {
  Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
  final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
  final SearchEdgeType searchByEdgeType = createConnectionTypeSearch( entityRef.asId() );
  return graphManager.getEdgeTypesToTarget(searchByEdgeType).map(
    edgeName -> getConnectionNameFromEdgeName( edgeName ) )
      .collect( () -> new HashSet<String>(  ), ( r, s ) -> r.add(s) ).toBlocking().last();
}

代码示例来源:origin: apache/usergrid

@Override
public Set<String> getConnectionTypes( boolean filterConnection ) throws Exception {
  final GraphManager gm = managerCache.getGraphManager( applicationScope );
  Observable<String> edges =
    gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
  return edges.collect( () -> new HashSet<String>(), ( edgeSet, edge ) -> {
    edgeSet.add( CpNamingUtils.getNameFromEdgeType( edge ) );
  } ).toBlocking().last();
}

代码示例来源:origin: apache/usergrid

private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  return observable -> observable
    .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
    .filter(msg -> !msg.isEmpty())
    .doOnNext(indexOperation -> {
      asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
    });
}

代码示例来源:origin: apache/usergrid

private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  return observable -> observable
    .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
    .filter(msg -> !msg.isEmpty())
    .doOnNext(indexOperation -> {
      asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
    });
}

代码示例来源:origin: apache/usergrid

@Override
public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
  GraphManager gm = managerCache.getGraphManager( applicationScope );
  String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
  if (logger.isTraceEnabled()) {
    logger.trace("getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}",
      edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid() );
  }
  Observable<Set<String>> types =
    gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) )
     .collect( () -> new HashSet<>(), ( set, type ) -> set.add( type ) );
  return types.toBlocking().last();
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<ResultsPage<T>> call( final Observable<FilterResult<T>> filterResultObservable ) {
  final int limit = pipelineContext.getLimit();
  return filterResultObservable
    .buffer( limit )
    .flatMap( buffer
      -> Observable
        .from( buffer )
        .collect(() -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) )
    )
    .map( resultsPageCollector ->
      new ResultsPage(
        resultsPageCollector.results,
        new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit()
      )
    );
}

代码示例来源:origin: apache/usergrid

@Test()
public void testSequence(){
  ArrayList listReturn =  Observable.range(0, 1).flatMap(i -> Observable.empty())
    .collect(()->new ArrayList(),(list,i) ->{
      list.add(i);
    }).toBlocking().lastOrDefault(null);
  Assert.assertEquals(listReturn,new ArrayList<Integer>());
}

代码示例来源:origin: apache/usergrid

@Test()
public void testSequence2(){
  ArrayList listReturn =  Observable.range(0, 2).buffer(2).flatMap(i -> Observable.empty())
    .collect(()->new ArrayList(),(list,i) ->{
      list.add(i);
    }).toBlocking().lastOrDefault(null);
  Assert.assertEquals(listReturn,new ArrayList<Integer>());
}

代码示例来源:origin: apache/usergrid

.collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
  if (logger.isTraceEnabled()) {
    logger.trace("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);

代码示例来源:origin: apache/usergrid

}, 10 ).collect( () -> new EntitySetImpl( entityIds.size() ), ( ( entitySet, rows ) -> {
  final Iterator<Row<ScopedRowKey<Id>, Boolean>> latestEntityColumns = rows.iterator();

代码示例来源:origin: apache/usergrid

@Override
public Map<String, Long> getEachCollectionSize(ApplicationScope applicationScope) {
  final IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
  EntityIndex entityIndex = entityIndexFactory.createEntityIndex(indexLocationStrategy);
  GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
  Map<String,Long> sumMap = ObservableTimer.time(
      graphManager.getEdgeTypesFromSource(new SimpleSearchEdgeType(applicationScope.getApplication(), CpNamingUtils.EDGE_COLL_PREFIX, Optional.<String>absent()))
        .collect(() -> new HashMap<String,Long>(), ((map, type) ->
          {
            SearchEdge edge = CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), type);
            final String collectionName = CpNamingUtils.getCollectionNameFromEdgeName(type);
            long sumType =  entityIndex.getTotalEntitySizeInBytes(edge);
            map.put(collectionName,sumType);
          })
        )
    , sumTimer).toBlocking().last();
  return sumMap;
}

代码示例来源:origin: apache/usergrid

buffer -> Observable.from( buffer ).collect( () -> keyspace.prepareMutationBatch(),
  ( ( mutationBatch, mvccLogEntryCollectionIoEvent ) -> {

代码示例来源:origin: apache/usergrid

@Override
public Observable<IndexOperationMessage> indexEntity( final ApplicationScope applicationScope,
                           final Entity entity ) {
  //bootstrap the lower modules from their caches
  final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
  final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
  final Id entityId = entity.getId();
  //we always index in the target scope
  final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId, true);
  //we may have to index  we're indexing from source->target here
  final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
  //do our observable for batching
  //try to send a whole batch if we can
  final Observable<IndexOperationMessage>  batches =  sourceEdgesToIndex
    .buffer(indexFig.getIndexBatchSize() )
    //map into batches based on our buffer size
    .flatMap( buffer -> Observable.from( buffer )
      //collect results into a single batch
      .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
        if (logger.isDebugEnabled()) {
          logger.debug("adding edge {} to batch for entity {}", indexEdge, entity);
        }
        final Optional<Set<String>> fieldsToIndex = getFilteredStringObjectMap( indexEdge );
        batch.index( indexEdge, entity ,fieldsToIndex);
      } )
        //return the future from the batch execution
      .map( batch -> batch.build() ) );
  return ObservableTimer.time( batches, indexTimer );
}

代码示例来源:origin: apache/usergrid

.flatMap( entitySet -> Observable.from( entitySet.getEntities() ) )
.collect( () -> new HashMap<String, UUID>(), ( appMap, entity ) -> {

代码示例来源:origin: apache/usergrid

@Test()
public void testSequence3(){
  ArrayList listReturn =  Observable.range(0, 2)
    .collect(()->new ArrayList(),(list,i) ->{
      list.add(i);
    }).toBlocking().first();
  Assert.assertEquals(listReturn, Observable.range(0, 2).toList().toBlocking().last());
}

代码示例来源:origin: apache/usergrid

return edges.collect( () -> new LinkedHashMap<EntityRef, Set<String>>(), ( entityRefSetMap, edge ) -> {
  if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
    if (logger.isDebugEnabled()) {

代码示例来源:origin: apache/usergrid

.collect( () -> new DataLoadResult(), ( dataloadResult, entitySearchResult ) -> {
  if ( entitySearchResult.found ) {
    dataloadResult.success();

代码示例来源:origin: apache/usergrid

Observable.from( entities ).collect( () -> entityIndex.createBatch(), ( entityIndexBatch, entity ) -> {
  IndexEdge edge = new IndexEdgeImpl( indexEdge.getNodeId(), indexEdge.getEdgeName(),
      SearchEdge.NodeType.SOURCE, edgeCounter.incrementAndGet()  );

相关文章

Observable类方法