本文整理了Java中akka.stream.javadsl.Source.filter()
方法的一些代码示例,展示了Source.filter()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.filter()
方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:filter
暂无
代码示例来源:origin: apptik/RHub
@Override
public <T> Source pub(Class<T> filterClass) {
return source.filter(o -> filterClass.isAssignableFrom(o.getClass()));
}
代码示例来源:origin: eclipse/ditto
private void retrieveThingsAndSendResult(final List<String> thingIds,
@Nullable final JsonFieldSelector selectedFields,
final Command<?> command, final ActorRef resultReceiver) {
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final CompletionStage<?> commandResponseSource = Source.from(thingIds)
.filter(Objects::nonNull)
.filterNot(String::isEmpty)
.filter(thingId -> THING_ID_PATTERN.matcher(thingId).matches())
.map(thingId -> {
final Command<?> toBeWrapped;
if (command instanceof RetrieveThings) {
toBeWrapped = Optional.ofNullable(selectedFields)
.map(sf -> RetrieveThing.getBuilder(thingId, dittoHeaders)
.withSelectedFields(sf)
.build())
.orElse(RetrieveThing.of(thingId, dittoHeaders));
} else {
toBeWrapped = Optional.ofNullable(selectedFields)
.map(sf -> SudoRetrieveThing.of(thingId, sf, dittoHeaders))
.orElse(SudoRetrieveThing.of(thingId, dittoHeaders));
}
return ConciergeWrapper.wrapForEnforcer(toBeWrapped);
})
.ask(calculateParallelism(thingIds), targetActor, Jsonifiable.class,
Timeout.apply(retrieveSingleThingTimeout.toMillis(), TimeUnit.MILLISECONDS))
.log("command-response", log)
.runWith(StreamRefs.sourceRef(), actorMaterializer);
PatternsCS.pipe(commandResponseSource, aggregatorDispatcher)
.to(resultReceiver);
}
代码示例来源:origin: apptik/RHub
@Override
@SuppressWarnings("unchecked")
protected <T> Publisher<T> filter(Processor processor, final Class<T> filterClass) {
Source src = Source.fromPublisher(processor)
.filter(o -> filterClass.isAssignableFrom(o.getClass()));
return (Publisher<T>) src.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
}
代码示例来源:origin: eclipse/ditto
return NotUsed.getInstance();
})
.filter(jsonifiable -> jsonifiable instanceof ThingEvent)
.map(jsonifiable -> ((ThingEvent) jsonifiable))
.filter(thingEvent -> targetThingIds.isEmpty() ||
.filter(thingEvent -> namespaces.isEmpty() || namespaces.contains(namespaceFromId(thingEvent)))
.map(ThingEventToThingConverter::thingEventToThing)
.filter(Optional::isPresent)
.map(Optional::get)
.map(thing -> fieldSelector != null ? thing.toJson(jsonSchemaVersion, fieldSelector) :
thing.toJson(jsonSchemaVersion))
.filter(thingJson -> fieldSelector == null || fieldSelector.getPointers().stream()
.filter(thingJson -> !thingJson.isEmpty()) // avoid sending back empty jsonValues
.map(jsonValue -> ServerSentEvent.create(jsonValue.toString()))
.keepAlive(Duration.ofSeconds(1), ServerSentEvent::heartbeat);
代码示例来源:origin: eclipse/ditto
@Override
protected final Source<T, NotUsed> createSource(final SudoStreamModifiedEntities command) {
final String actorName = getSelf().path().name();
final String unfilteredStreamingLogName = actorName + "unfiltered-streaming";
final String filteredStreamingLogName = actorName + "filtered-streaming";
// create a separate cache per stream (don't use member variable!)
final ComparableCache<String, Long> cache = new ComparableCache<>(streamingCacheSize);
return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd())
.log(unfilteredStreamingLogName, log)
// avoid unnecessary streaming of old sequence numbers
.filter(pidWithSeqNr ->
cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()))
.map(this::mapEntity)
.log(filteredStreamingLogName, log);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
@Override
protected final Source<T, NotUsed> createSource(final SudoStreamModifiedEntities command) {
final String actorName = getSelf().path().name();
final String unfilteredStreamingLogName = actorName + "unfiltered-streaming";
final String filteredStreamingLogName = actorName + "filtered-streaming";
// create a separate cache per stream (don't use member variable!)
final ComparableCache<String, Long> cache = new ComparableCache<>(streamingCacheSize);
return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd())
.log(unfilteredStreamingLogName, log)
// avoid unnecessary streaming of old sequence numbers
.filter(pidWithSeqNr ->
cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()))
.map(this::mapEntity)
.log(filteredStreamingLogName, log);
}
内容来源于网络,如有侵权,请联系作者删除!