本文整理了Java中akka.stream.javadsl.Source.log()
方法的一些代码示例,展示了Source.log()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.log()
方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:log
暂无
代码示例来源:origin: eclipse/ditto
private static Graph<SourceShape<WithSender>, NotUsed> keepResultAndLogErrors(final Object result) {
if (result instanceof WithSender) {
return Source.single((WithSender) result);
} else if (result instanceof DittoRuntimeException) {
return Source.single(result)
.log("PreEnforcer replied DittoRuntimeException")
.withAttributes(INFO_LEVEL)
.flatMapConcat(x -> Source.empty());
} else {
return Source.single(result)
.log("PreEnforcer encountered unexpected exception")
.withAttributes(ERROR_LEVEL)
.flatMapConcat(x -> Source.empty());
}
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
@Override
public Source<ResultList<String>, NotUsed> findAll(final PolicyRestrictedSearchAggregation aggregation) {
checkNotNull(aggregation, "aggregation");
final Source<Document, NotUsed> source = aggregation.execute(collection, maxQueryTime);
return source.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
.fold(new ArrayList<String>(), (list, id) -> {
list.add(id);
return list;
})
.map(resultsPlus0ne -> toResultList(resultsPlus0ne, aggregation.getSkip(), aggregation.getLimit()))
.mapError(handleMongoExecutionTimeExceededException())
.log("findAll");
}
代码示例来源:origin: eclipse/ditto
@Override
public Source<ResultList<String>, NotUsed> findAll(final PolicyRestrictedSearchAggregation aggregation) {
checkNotNull(aggregation, "aggregation");
final Source<Document, NotUsed> source = aggregation.execute(collection, maxQueryTime);
return source.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
.fold(new ArrayList<String>(), (list, id) -> {
list.add(id);
return list;
})
.map(resultsPlus0ne -> toResultList(resultsPlus0ne, aggregation.getSkip(), aggregation.getLimit()))
.mapError(handleMongoExecutionTimeExceededException())
.log("findAll");
}
代码示例来源:origin: eclipse/ditto
private void retrieveThingsAndSendResult(final List<String> thingIds,
@Nullable final JsonFieldSelector selectedFields,
final Command<?> command, final ActorRef resultReceiver) {
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final CompletionStage<?> commandResponseSource = Source.from(thingIds)
.filter(Objects::nonNull)
.filterNot(String::isEmpty)
.filter(thingId -> THING_ID_PATTERN.matcher(thingId).matches())
.map(thingId -> {
final Command<?> toBeWrapped;
if (command instanceof RetrieveThings) {
toBeWrapped = Optional.ofNullable(selectedFields)
.map(sf -> RetrieveThing.getBuilder(thingId, dittoHeaders)
.withSelectedFields(sf)
.build())
.orElse(RetrieveThing.of(thingId, dittoHeaders));
} else {
toBeWrapped = Optional.ofNullable(selectedFields)
.map(sf -> SudoRetrieveThing.of(thingId, sf, dittoHeaders))
.orElse(SudoRetrieveThing.of(thingId, dittoHeaders));
}
return ConciergeWrapper.wrapForEnforcer(toBeWrapped);
})
.ask(calculateParallelism(thingIds), targetActor, Jsonifiable.class,
Timeout.apply(retrieveSingleThingTimeout.toMillis(), TimeUnit.MILLISECONDS))
.log("command-response", log)
.runWith(StreamRefs.sourceRef(), actorMaterializer);
PatternsCS.pipe(commandResponseSource, aggregatorDispatcher)
.to(resultReceiver);
}
代码示例来源:origin: eclipse/ditto
@Override
protected final Source<T, NotUsed> createSource(final SudoStreamModifiedEntities command) {
final String actorName = getSelf().path().name();
final String unfilteredStreamingLogName = actorName + "unfiltered-streaming";
final String filteredStreamingLogName = actorName + "filtered-streaming";
// create a separate cache per stream (don't use member variable!)
final ComparableCache<String, Long> cache = new ComparableCache<>(streamingCacheSize);
return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd())
.log(unfilteredStreamingLogName, log)
// avoid unnecessary streaming of old sequence numbers
.filter(pidWithSeqNr ->
cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()))
.map(this::mapEntity)
.log(filteredStreamingLogName, log);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
@Override
protected final Source<T, NotUsed> createSource(final SudoStreamModifiedEntities command) {
final String actorName = getSelf().path().name();
final String unfilteredStreamingLogName = actorName + "unfiltered-streaming";
final String filteredStreamingLogName = actorName + "filtered-streaming";
// create a separate cache per stream (don't use member variable!)
final ComparableCache<String, Long> cache = new ComparableCache<>(streamingCacheSize);
return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd())
.log(unfilteredStreamingLogName, log)
// avoid unnecessary streaming of old sequence numbers
.filter(pidWithSeqNr ->
cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()))
.map(this::mapEntity)
.log(filteredStreamingLogName, log);
}
代码示例来源:origin: eclipse/ditto
@Override
public Source<ResultList<String>, NotUsed> findAll(final Query query) {
checkNotNull(query, "query");
final BsonDocument queryFilter = getMongoFilter(query);
if (log.isDebugEnabled()) {
log.debug("findAll with query filter <{}>.", queryFilter);
}
final Bson filter = and(filterNotDeleted(), queryFilter);
final Optional<Bson> sortOptions = Optional.of(getMongoSort(query));
final int limit = query.getLimit();
final int skip = query.getSkip();
final Bson projection = new Document(PersistenceConstants.FIELD_ID, 1);
return Source.fromPublisher(collection.find(filter, Document.class)
.sort(sortOptions.orElse(null))
.limit(limit + 1)
.skip(skip)
.projection(projection)
.maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS)
)
.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
.fold(new ArrayList<String>(), (list, id) -> {
list.add(id);
return list;
})
.map(resultsPlus0ne -> toResultList(resultsPlus0ne, skip, limit))
.mapError(handleMongoExecutionTimeExceededException())
.log("findAll");
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
@Override
public Source<ResultList<String>, NotUsed> findAll(final Query query) {
checkNotNull(query, "query");
final BsonDocument queryFilter = getMongoFilter(query);
if (log.isDebugEnabled()) {
log.debug("findAll with query filter <{}>.", queryFilter);
}
final Bson filter = and(filterNotDeleted(), queryFilter);
final Optional<Bson> sortOptions = Optional.of(getMongoSort(query));
final int limit = query.getLimit();
final int skip = query.getSkip();
final Bson projection = new Document(PersistenceConstants.FIELD_ID, 1);
return Source.fromPublisher(collection.find(filter, Document.class)
.sort(sortOptions.orElse(null))
.limit(limit + 1)
.skip(skip)
.projection(projection)
.maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS)
)
.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
.fold(new ArrayList<String>(), (list, id) -> {
list.add(id);
return list;
})
.map(resultsPlus0ne -> toResultList(resultsPlus0ne, skip, limit))
.mapError(handleMongoExecutionTimeExceededException())
.log("findAll");
}
代码示例来源:origin: eclipse/ditto
@Override
public Source<Long, NotUsed> count(final PolicyRestrictedSearchAggregation policyRestrictedSearchAggregation) {
checkNotNull(policyRestrictedSearchAggregation, "policy restricted aggregation");
final Source<Document, NotUsed> source = policyRestrictedSearchAggregation.execute(collection, maxQueryTime);
return source.map(doc -> doc.get(PersistenceConstants.COUNT_RESULT_NAME))
.map(countResult -> (Number) countResult)
.map(Number::longValue) // use Number.longValue() to support both Integer and Long values
.orElse(Source.<Long>single(0L))
.mapError(handleMongoExecutionTimeExceededException())
.log("count");
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
@Override
public Source<Long, NotUsed> count(final PolicyRestrictedSearchAggregation policyRestrictedSearchAggregation) {
checkNotNull(policyRestrictedSearchAggregation, "policy restricted aggregation");
final Source<Document, NotUsed> source = policyRestrictedSearchAggregation.execute(collection, maxQueryTime);
return source.map(doc -> doc.get(PersistenceConstants.COUNT_RESULT_NAME))
.map(countResult -> (Number) countResult)
.map(Number::longValue) // use Number.longValue() to support both Integer and Long values
.orElse(Source.<Long>single(0L))
.mapError(handleMongoExecutionTimeExceededException())
.log("count");
}
代码示例来源:origin: eclipse/ditto
@Override
public Source<Long, NotUsed> count(final Query query) {
checkNotNull(query, "query");
final BsonDocument queryFilter = getMongoFilter(query);
log.debug("count with query filter <{}>.", queryFilter);
final Bson filter = and(filterNotDeleted(), queryFilter);
final CountOptions countOptions = new CountOptions()
.skip(query.getSkip())
.limit(query.getLimit())
.maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS);
return Source.fromPublisher(collection.count(filter, countOptions))
.mapError(handleMongoExecutionTimeExceededException())
.log("count");
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
@Override
public Source<Long, NotUsed> count(final Query query) {
checkNotNull(query, "query");
final BsonDocument queryFilter = getMongoFilter(query);
log.debug("count with query filter <{}>.", queryFilter);
final Bson filter = and(filterNotDeleted(), queryFilter);
final CountOptions countOptions = new CountOptions()
.skip(query.getSkip())
.limit(query.getLimit())
.maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS);
return Source.fromPublisher(collection.count(filter, countOptions))
.mapError(handleMongoExecutionTimeExceededException())
.log("count");
}
代码示例来源:origin: eclipse/ditto
.filterNot(el -> el instanceof DittoRuntimeException)
.map(param -> thingPlainJsonSupplier.apply((Jsonifiable<?>) param))
.log("retrieve-thing-response", log)
.recover(new PFBuilder()
.match(NoSuchElementException.class,
内容来源于网络,如有侵权,请联系作者删除!