akka.stream.javadsl.Source.log()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(105)

本文整理了Java中akka.stream.javadsl.Source.log()方法的一些代码示例,展示了Source.log()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.log()方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:log

Source.log介绍

暂无

代码示例

代码示例来源:origin: eclipse/ditto

private static Graph<SourceShape<WithSender>, NotUsed> keepResultAndLogErrors(final Object result) {
  if (result instanceof WithSender) {
    return Source.single((WithSender) result);
  } else if (result instanceof DittoRuntimeException) {
    return Source.single(result)
        .log("PreEnforcer replied DittoRuntimeException")
        .withAttributes(INFO_LEVEL)
        .flatMapConcat(x -> Source.empty());
  } else {
    return Source.single(result)
        .log("PreEnforcer encountered unexpected exception")
        .withAttributes(ERROR_LEVEL)
        .flatMapConcat(x -> Source.empty());
  }
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<ResultList<String>, NotUsed> findAll(final PolicyRestrictedSearchAggregation aggregation) {
  checkNotNull(aggregation, "aggregation");
  final Source<Document, NotUsed> source = aggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
      .fold(new ArrayList<String>(), (list, id) -> {
        list.add(id);
        return list;
      })
      .map(resultsPlus0ne -> toResultList(resultsPlus0ne, aggregation.getSkip(), aggregation.getLimit()))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("findAll");
}

代码示例来源:origin: eclipse/ditto

@Override
public Source<ResultList<String>, NotUsed> findAll(final PolicyRestrictedSearchAggregation aggregation) {
  checkNotNull(aggregation, "aggregation");
  final Source<Document, NotUsed> source = aggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
      .fold(new ArrayList<String>(), (list, id) -> {
        list.add(id);
        return list;
      })
      .map(resultsPlus0ne -> toResultList(resultsPlus0ne, aggregation.getSkip(), aggregation.getLimit()))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("findAll");
}

代码示例来源:origin: eclipse/ditto

private void retrieveThingsAndSendResult(final List<String> thingIds,
    @Nullable final JsonFieldSelector selectedFields,
    final Command<?> command, final ActorRef resultReceiver) {
  final DittoHeaders dittoHeaders = command.getDittoHeaders();
  final CompletionStage<?> commandResponseSource = Source.from(thingIds)
      .filter(Objects::nonNull)
      .filterNot(String::isEmpty)
      .filter(thingId -> THING_ID_PATTERN.matcher(thingId).matches())
      .map(thingId -> {
        final Command<?> toBeWrapped;
        if (command instanceof RetrieveThings) {
          toBeWrapped = Optional.ofNullable(selectedFields)
              .map(sf -> RetrieveThing.getBuilder(thingId, dittoHeaders)
                  .withSelectedFields(sf)
                  .build())
              .orElse(RetrieveThing.of(thingId, dittoHeaders));
        } else {
          toBeWrapped = Optional.ofNullable(selectedFields)
              .map(sf -> SudoRetrieveThing.of(thingId, sf, dittoHeaders))
              .orElse(SudoRetrieveThing.of(thingId, dittoHeaders));
        }
        return ConciergeWrapper.wrapForEnforcer(toBeWrapped);
      })
      .ask(calculateParallelism(thingIds), targetActor, Jsonifiable.class,
          Timeout.apply(retrieveSingleThingTimeout.toMillis(), TimeUnit.MILLISECONDS))
      .log("command-response", log)
      .runWith(StreamRefs.sourceRef(), actorMaterializer);
  PatternsCS.pipe(commandResponseSource, aggregatorDispatcher)
      .to(resultReceiver);
}

代码示例来源:origin: eclipse/ditto

@Override
protected final Source<T, NotUsed> createSource(final SudoStreamModifiedEntities command) {
  final String actorName = getSelf().path().name();
  final String unfilteredStreamingLogName = actorName + "unfiltered-streaming";
  final String filteredStreamingLogName = actorName + "filtered-streaming";
  // create a separate cache per stream (don't use member variable!)
  final ComparableCache<String, Long> cache = new ComparableCache<>(streamingCacheSize);
  return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd())
      .log(unfilteredStreamingLogName, log)
      // avoid unnecessary streaming of old sequence numbers
      .filter(pidWithSeqNr ->
          cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()))
      .map(this::mapEntity)
      .log(filteredStreamingLogName, log);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

@Override
protected final Source<T, NotUsed> createSource(final SudoStreamModifiedEntities command) {
  final String actorName = getSelf().path().name();
  final String unfilteredStreamingLogName = actorName + "unfiltered-streaming";
  final String filteredStreamingLogName = actorName + "filtered-streaming";
  // create a separate cache per stream (don't use member variable!)
  final ComparableCache<String, Long> cache = new ComparableCache<>(streamingCacheSize);
  return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd())
      .log(unfilteredStreamingLogName, log)
      // avoid unnecessary streaming of old sequence numbers
      .filter(pidWithSeqNr ->
          cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()))
      .map(this::mapEntity)
      .log(filteredStreamingLogName, log);
}

代码示例来源:origin: eclipse/ditto

@Override
public Source<ResultList<String>, NotUsed> findAll(final Query query) {
  checkNotNull(query, "query");
  final BsonDocument queryFilter = getMongoFilter(query);
  if (log.isDebugEnabled()) {
    log.debug("findAll with query filter <{}>.", queryFilter);
  }
  final Bson filter = and(filterNotDeleted(), queryFilter);
  final Optional<Bson> sortOptions = Optional.of(getMongoSort(query));
  final int limit = query.getLimit();
  final int skip = query.getSkip();
  final Bson projection = new Document(PersistenceConstants.FIELD_ID, 1);
  return Source.fromPublisher(collection.find(filter, Document.class)
      .sort(sortOptions.orElse(null))
      .limit(limit + 1)
      .skip(skip)
      .projection(projection)
      .maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS)
  )
      .map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
      .fold(new ArrayList<String>(), (list, id) -> {
        list.add(id);
        return list;
      })
      .map(resultsPlus0ne -> toResultList(resultsPlus0ne, skip, limit))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("findAll");
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<ResultList<String>, NotUsed> findAll(final Query query) {
  checkNotNull(query, "query");
  final BsonDocument queryFilter = getMongoFilter(query);
  if (log.isDebugEnabled()) {
    log.debug("findAll with query filter <{}>.", queryFilter);
  }
  final Bson filter = and(filterNotDeleted(), queryFilter);
  final Optional<Bson> sortOptions = Optional.of(getMongoSort(query));
  final int limit = query.getLimit();
  final int skip = query.getSkip();
  final Bson projection = new Document(PersistenceConstants.FIELD_ID, 1);
  return Source.fromPublisher(collection.find(filter, Document.class)
      .sort(sortOptions.orElse(null))
      .limit(limit + 1)
      .skip(skip)
      .projection(projection)
      .maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS)
  )
      .map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
      .fold(new ArrayList<String>(), (list, id) -> {
        list.add(id);
        return list;
      })
      .map(resultsPlus0ne -> toResultList(resultsPlus0ne, skip, limit))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("findAll");
}

代码示例来源:origin: eclipse/ditto

@Override
public Source<Long, NotUsed> count(final PolicyRestrictedSearchAggregation policyRestrictedSearchAggregation) {
  checkNotNull(policyRestrictedSearchAggregation, "policy restricted aggregation");
  final Source<Document, NotUsed> source = policyRestrictedSearchAggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.get(PersistenceConstants.COUNT_RESULT_NAME))
      .map(countResult -> (Number) countResult)
      .map(Number::longValue) // use Number.longValue() to support both Integer and Long values
      .orElse(Source.<Long>single(0L))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("count");
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<Long, NotUsed> count(final PolicyRestrictedSearchAggregation policyRestrictedSearchAggregation) {
  checkNotNull(policyRestrictedSearchAggregation, "policy restricted aggregation");
  final Source<Document, NotUsed> source = policyRestrictedSearchAggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.get(PersistenceConstants.COUNT_RESULT_NAME))
      .map(countResult -> (Number) countResult)
      .map(Number::longValue) // use Number.longValue() to support both Integer and Long values
      .orElse(Source.<Long>single(0L))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("count");
}

代码示例来源:origin: eclipse/ditto

@Override
public Source<Long, NotUsed> count(final Query query) {
  checkNotNull(query, "query");
  final BsonDocument queryFilter = getMongoFilter(query);
  log.debug("count with query filter <{}>.", queryFilter);
  final Bson filter = and(filterNotDeleted(), queryFilter);
  final CountOptions countOptions = new CountOptions()
      .skip(query.getSkip())
      .limit(query.getLimit())
      .maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS);
  return Source.fromPublisher(collection.count(filter, countOptions))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("count");
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<Long, NotUsed> count(final Query query) {
  checkNotNull(query, "query");
  final BsonDocument queryFilter = getMongoFilter(query);
  log.debug("count with query filter <{}>.", queryFilter);
  final Bson filter = and(filterNotDeleted(), queryFilter);
  final CountOptions countOptions = new CountOptions()
      .skip(query.getSkip())
      .limit(query.getLimit())
      .maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS);
  return Source.fromPublisher(collection.count(filter, countOptions))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("count");
}

代码示例来源:origin: eclipse/ditto

.filterNot(el -> el instanceof DittoRuntimeException)
.map(param -> thingPlainJsonSupplier.apply((Jsonifiable<?>) param))
.log("retrieve-thing-response", log)
.recover(new PFBuilder()
    .match(NoSuchElementException.class,

相关文章