rx.Observable.filter()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.3k)|赞(0)|评价(0)|浏览(137)

本文整理了Java中rx.Observable.filter()方法的一些代码示例,展示了Observable.filter()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.filter()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:filter

Observable.filter介绍

[英]Filters items emitted by an Observable by only emitting those that satisfy a specified predicate.

Scheduler: filter does not operate by default on a particular Scheduler.
[中]通过仅发送满足指定谓词的项来过滤可观察项发出的项。
调度程序:默认情况下,过滤器不会在特定调度程序上运行。

代码示例

代码示例来源:origin: RichardWarburton/java-8-lambdas-exercises

public Observable<Artist> search(String searchedName,
                 String searchedNationality,
                 int maxResults) {

  return getSavedArtists()  // <1>
     .filter(name -> name.contains(searchedName)) // <2>
     .flatMap(this::lookupArtist) // <3>
     .filter(artist -> artist.getNationality() // <4>
                 .contains(searchedNationality))
     .take(maxResults); // <5>
}
  // END search

代码示例来源:origin: konmik/nucleus

@Override
  public Observable<Delivery<View, T>> call(Observable<T> observable) {
    return Observable
      .combineLatest(
        view,
        observable
          .materialize()
          .filter(new Func1<Notification<T>, Boolean>() {
            @Override
            public Boolean call(Notification<T> notification) {
              return !notification.isOnCompleted();
            }
          }),
        new Func2<View, Notification<T>, Delivery<View, T>>() {
          @Override
          public Delivery<View, T> call(View view, Notification<T> notification) {
            return view == null ? null : new Delivery<>(view, notification);
          }
        })
      .filter(new Func1<Delivery<View, T>, Boolean>() {
        @Override
        public Boolean call(Delivery<View, T> delivery) {
          return delivery != null;
        }
      });
  }
}

代码示例来源:origin: amitshekhariitbhu/Fast-Android-Networking

.filter(new Func1<User, Boolean>() {
  @Override
  public Boolean call(User user) {

代码示例来源:origin: HotBitmapGG/bilibili-android-client

private void search() {
  RxView.clicks(mSearchBtn)
      .throttleFirst(2, TimeUnit.SECONDS)
      .map(aVoid -> mSearchEdit.getText().toString().trim())
      .filter(s -> !TextUtils.isEmpty(s))
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(s -> {
        KeyBoardUtil.closeKeybord(mSearchEdit, TotalStationSearchActivity.this);
        showSearchAnim();
        clearData();
        content = s;
        getSearchData();
      });
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

.filter(integer -> !TextUtils.isEmpty(mSearchEdit.getText().toString().trim()))
.filter(integer -> integer == EditorInfo.IME_ACTION_SEARCH)
.flatMap(new Func1<Integer, Observable<String>>() {
  @Override

代码示例来源:origin: Rukey7/MvpApp

/**
 * 先进行初始化,把之前下载的图片记录下来
 * @param dbDao
 */
public static void init(BeautyPhotoInfoDao dbDao) {
  dbDao.queryBuilder().rx().list()
      .subscribeOn(Schedulers.io())
      .flatMap(new Func1<List<BeautyPhotoInfo>, Observable<BeautyPhotoInfo>>() {
        @Override
        public Observable<BeautyPhotoInfo> call(List<BeautyPhotoInfo> photoList) {
          return Observable.from(photoList);
        }
      })
      .filter(new Func1<BeautyPhotoInfo, Boolean>() {
        @Override
        public Boolean call(BeautyPhotoInfo bean) {
          return bean.isDownload();
        }
      })
      .subscribe(new Action1<BeautyPhotoInfo>() {
        @Override
        public void call(BeautyPhotoInfo bean) {
          sDlPhotos.put(bean.getImgsrc().hashCode(), true);
        }
      });
}

代码示例来源:origin: apache/usergrid

public Observable<MarkedEdge> repair( final ApplicationScope scope, final MarkedEdge edge, final UUID timestamp ) {
  //merge source and target then deal with the distinct values
  return Observable.just( edge ).filter( markedEdge-> markedEdge.isDeleted() )
      .doOnNext( markedEdge -> {
          //it's still in the same state as it was when we queued it. Remove it
          if(logger.isDebugEnabled()){
            logger.debug( "Removing edge {} ", markedEdge );
          }
          //remove from the commit log
          //remove from storage
          try {
            storageSerialization.deleteEdge( scope, markedEdge, timestamp ).execute();
          }
          catch ( ConnectionException e ) {
            throw new RuntimeException( "Unable to connect to casandra", e );
          }
        }
     );
}

代码示例来源:origin: Rukey7/MvpApp

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.filter(new Func1<NewsTypeInfo, Boolean>() {
  @Override
  public Boolean call(NewsTypeInfo newsTypeBean) {

代码示例来源:origin: Rukey7/MvpApp

@Override
public void getData(boolean isRefresh) {
  mDbDao.queryBuilder().rx()
      .oneByOne()
      .filter(new Func1<VideoInfo, Boolean>() {
        @Override
        public Boolean call(VideoInfo info) {
          // 判断是否存于下载中
          return (info.getDownloadStatus() != DownloadStatus.NORMAL &&
              info.getDownloadStatus() != DownloadStatus.COMPLETE);
        }
      })
      .toList()
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Action1<List<VideoInfo>>() {
        @Override
        public void call(List<VideoInfo> videoList) {
          if (ListUtils.isEmpty(videoList)) {
            mView.noData();
          } else {
            mView.loadData(videoList);
          }
        }
      });
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<Edge> deleteEdge( final Edge edge ) {
  GraphValidation.validateEdge( edge );
  final UUID startTimestamp = UUIDGenerator.newTimeUUID();
  final Observable<Edge> observable =
    Observable.create( new ObservableIterator<MarkedEdge>( "read edge versions" ) {
      @Override
      protected Iterator<MarkedEdge> getIterator() {
        return storageEdgeSerialization.getEdgeVersions( scope,
          new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
            Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) );
      }
    } ).filter( markedEdge -> markedEdge.isDeleted() ).flatMap( marked ->
      //fire our delete listener and wait for the results
      edgeDeleteListener.receive( scope, marked, startTimestamp ).doOnNext(
        //log them
        count -> logger.trace( "removed {} types for edge {} ", count, edge ) )
        //return the marked edge
        .map( count -> marked ) );
  return ObservableTimer.time( observable, deleteEdgeTimer );
}

代码示例来源:origin: konmik/nucleus

final Subscription subscription = observable
  .materialize()
  .filter(new Func1<Notification<T>, Boolean>() {
    @Override
    public Boolean call(Notification<T> notification) {

代码示例来源:origin: konmik/nucleus

@Override
  public Observable<Delivery<View, T>> call(Observable<T> observable) {
    return observable.materialize()
      .take(1)
      .switchMap(new Func1<Notification<T>, Observable<? extends Delivery<View, T>>>() {
        @Override
        public Observable<? extends Delivery<View, T>> call(final Notification<T> notification) {
          return view.map(new Func1<View, Delivery<View, T>>() {
            @Override
            public Delivery<View, T> call(View view) {
              return view == null ? null : new Delivery<>(view, notification);
            }
          });
        }
      })
      .filter(new Func1<Delivery<View, T>, Boolean>() {
        @Override
        public Boolean call(Delivery<View, T> delivery) {
          return delivery != null;
        }
      })
      .take(1);
  }
}

代码示例来源:origin: apache/usergrid

private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  return observable -> observable
    .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
    .filter(msg -> !msg.isEmpty())
    .doOnNext(indexOperation -> {
      asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
    });
}

代码示例来源:origin: apache/usergrid

private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  return observable -> observable
    .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
    .filter(msg -> !msg.isEmpty())
    .doOnNext(indexOperation -> {
      asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
    });
}

代码示例来源:origin: apache/usergrid

.filter(id -> {
  final String type = InflectionUtils.pluralize(((Id) id).getType());
  return ! (type.equals(Schema.COLLECTION_USERS)

代码示例来源:origin: apache/usergrid

/**
 * Use Graph to get old appinfos from the old and deprecated System App.
 */
public Observable<org.apache.usergrid.persistence.model.entity.Entity> getOldAppInfos() {
  final ApplicationScope systemAppScope = getApplicationScope( SYSTEM_APP_ID );
  final EntityCollectionManager systemCollectionManager =
    entityCollectionManagerFactory.createCollectionManager( systemAppScope );
  final GraphManager gm = graphManagerFactory.createEdgeManager( systemAppScope );
  String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( "appinfos" );
  Id rootAppId = systemAppScope.getApplication();
  final SimpleSearchByEdgeType simpleSearchByEdgeType =
    new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
      Optional.absent() );
  Observable<org.apache.usergrid.persistence.model.entity.Entity> entityObs =
    gm.loadEdgesFromSource( simpleSearchByEdgeType ).flatMap( edge -> {
      final Id appInfoId = edge.getTargetNode();
      return systemCollectionManager.load( appInfoId ).filter( entity -> ( entity != null ) );
    } );
  return entityObs;
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) {
  final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
  final Id entityId = entityIndexOperation.getId();
  //load the entity
  return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ).filter(
    entity -> {
      final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
      /**
       * We don't have a modified field, so we can't check, pass it through
       */
      if ( modified == null ) {
        return true;
      }
      //entityIndexOperation.getUpdatedSince will always be 0 except for reindexing the application
      //only re-index if it has been updated and been updated after our timestamp
      return modified.getValue() >= entityIndexOperation.getUpdatedSince();
    } )
    //perform indexing on the task scheduler and start it
    .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
}

代码示例来源:origin: apache/usergrid

/**
 * Tests that reduce emits
 */
@Test
public void testReduceEmpty(){
  final int result =  Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
  assertEquals(0, result);
}

代码示例来源:origin: ReactiveX/RxNetty

@Override
  public void call(Subscriber<? super Connection<R, W>> subscriber) {
    if (isShutdown) {
      subscriber.onError(new IllegalStateException("Connection provider is shutdown."));
    }
    idleConnectionsHolder.pollThisEventLoopConnections()
               .concatWith(connectIfAllowed())
               .filter(new Func1<PooledConnection<R, W>, Boolean>() {
                 @Override
                 public Boolean call(PooledConnection<R, W> c) {
                   boolean isUsable = c.isUsable();
                   if (!isUsable) {
                     discardNow(c);
                   }
                   return isUsable;
                 }
               })
               .take(1)
               .lift(new ReuseSubscriberLinker())
               .lift(new ConnectMetricsOperator())
               .unsafeSubscribe(subscriber);
  }
});

代码示例来源:origin: apache/usergrid

/**
 * TODO: Use Graph to get application_info for an specified Application.
 */
private org.apache.usergrid.persistence.Entity getApplicationInfo( final UUID appId ) throws Exception {
  final ApplicationScope managementAppScope = getApplicationScope( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
  final EntityCollectionManager managementCollectionManager =
    entityCollectionManagerFactory.createCollectionManager( managementAppScope );
  Observable<MarkedEdge> edgesObservable = getApplicationInfoEdges( appId );
  //get the graph for all app infos
  Observable<org.apache.usergrid.persistence.model.entity.Entity> entityObs = edgesObservable.flatMap( edge -> {
    final Id appInfoId = edge.getTargetNode();
    return managementCollectionManager.load( appInfoId ).filter( entity -> {
      //check for app id
      return entity != null ? entity.getId().getUuid().equals( appId ) : false;
    } );
  } );
  // don't expect many applications, so we block
  org.apache.usergrid.persistence.model.entity.Entity applicationInfo =
    entityObs.toBlocking().lastOrDefault( null );
  if ( applicationInfo == null ) {
    return null;
  }
  Class clazz = Schema.getDefaultSchema().getEntityClass( applicationInfo.getId().getType() );
  org.apache.usergrid.persistence.Entity entity =
    EntityFactory.newEntity( applicationInfo.getId().getUuid(), applicationInfo.getId().getType(), clazz );
  entity.setProperties( CpEntityMapUtils.toMap( applicationInfo ) );
  return entity;
}

相关文章

Observable类方法