akka.stream.javadsl.Source.filter()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(4.5k)|赞(0)|评价(0)|浏览(132)

本文整理了Java中akka.stream.javadsl.Source.filter()方法的一些代码示例,展示了Source.filter()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.filter()方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:filter

Source.filter介绍

暂无

代码示例

代码示例来源:origin: apptik/RHub

@Override
public <T> Source pub(Class<T> filterClass) {
  return source.filter(o -> filterClass.isAssignableFrom(o.getClass()));
}

代码示例来源:origin: eclipse/ditto

private void retrieveThingsAndSendResult(final List<String> thingIds,
    @Nullable final JsonFieldSelector selectedFields,
    final Command<?> command, final ActorRef resultReceiver) {
  final DittoHeaders dittoHeaders = command.getDittoHeaders();
  final CompletionStage<?> commandResponseSource = Source.from(thingIds)
      .filter(Objects::nonNull)
      .filterNot(String::isEmpty)
      .filter(thingId -> THING_ID_PATTERN.matcher(thingId).matches())
      .map(thingId -> {
        final Command<?> toBeWrapped;
        if (command instanceof RetrieveThings) {
          toBeWrapped = Optional.ofNullable(selectedFields)
              .map(sf -> RetrieveThing.getBuilder(thingId, dittoHeaders)
                  .withSelectedFields(sf)
                  .build())
              .orElse(RetrieveThing.of(thingId, dittoHeaders));
        } else {
          toBeWrapped = Optional.ofNullable(selectedFields)
              .map(sf -> SudoRetrieveThing.of(thingId, sf, dittoHeaders))
              .orElse(SudoRetrieveThing.of(thingId, dittoHeaders));
        }
        return ConciergeWrapper.wrapForEnforcer(toBeWrapped);
      })
      .ask(calculateParallelism(thingIds), targetActor, Jsonifiable.class,
          Timeout.apply(retrieveSingleThingTimeout.toMillis(), TimeUnit.MILLISECONDS))
      .log("command-response", log)
      .runWith(StreamRefs.sourceRef(), actorMaterializer);
  PatternsCS.pipe(commandResponseSource, aggregatorDispatcher)
      .to(resultReceiver);
}

代码示例来源:origin: apptik/RHub

@Override
@SuppressWarnings("unchecked")
protected <T> Publisher<T> filter(Processor processor, final Class<T> filterClass) {
  Source src = Source.fromPublisher(processor)
      .filter(o -> filterClass.isAssignableFrom(o.getClass()));
  return (Publisher<T>) src.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
}

代码示例来源:origin: eclipse/ditto

return NotUsed.getInstance();
})
.filter(jsonifiable -> jsonifiable instanceof ThingEvent)
.map(jsonifiable -> ((ThingEvent) jsonifiable))
.filter(thingEvent -> targetThingIds.isEmpty() ||
.filter(thingEvent -> namespaces.isEmpty() || namespaces.contains(namespaceFromId(thingEvent)))
.map(ThingEventToThingConverter::thingEventToThing)
.filter(Optional::isPresent)
.map(Optional::get)
.map(thing -> fieldSelector != null ? thing.toJson(jsonSchemaVersion, fieldSelector) :
    thing.toJson(jsonSchemaVersion))
.filter(thingJson -> fieldSelector == null || fieldSelector.getPointers().stream()
.filter(thingJson -> !thingJson.isEmpty()) // avoid sending back empty jsonValues
.map(jsonValue -> ServerSentEvent.create(jsonValue.toString()))
.keepAlive(Duration.ofSeconds(1), ServerSentEvent::heartbeat);

代码示例来源:origin: eclipse/ditto

@Override
protected final Source<T, NotUsed> createSource(final SudoStreamModifiedEntities command) {
  final String actorName = getSelf().path().name();
  final String unfilteredStreamingLogName = actorName + "unfiltered-streaming";
  final String filteredStreamingLogName = actorName + "filtered-streaming";
  // create a separate cache per stream (don't use member variable!)
  final ComparableCache<String, Long> cache = new ComparableCache<>(streamingCacheSize);
  return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd())
      .log(unfilteredStreamingLogName, log)
      // avoid unnecessary streaming of old sequence numbers
      .filter(pidWithSeqNr ->
          cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()))
      .map(this::mapEntity)
      .log(filteredStreamingLogName, log);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

@Override
protected final Source<T, NotUsed> createSource(final SudoStreamModifiedEntities command) {
  final String actorName = getSelf().path().name();
  final String unfilteredStreamingLogName = actorName + "unfiltered-streaming";
  final String filteredStreamingLogName = actorName + "filtered-streaming";
  // create a separate cache per stream (don't use member variable!)
  final ComparableCache<String, Long> cache = new ComparableCache<>(streamingCacheSize);
  return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd())
      .log(unfilteredStreamingLogName, log)
      // avoid unnecessary streaming of old sequence numbers
      .filter(pidWithSeqNr ->
          cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()))
      .map(this::mapEntity)
      .log(filteredStreamingLogName, log);
}

相关文章