akka.stream.javadsl.Source.flatMapConcat()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(126)

本文整理了Java中akka.stream.javadsl.Source.flatMapConcat()方法的一些代码示例,展示了Source.flatMapConcat()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.flatMapConcat()方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:flatMapConcat

Source.flatMapConcat介绍

暂无

代码示例

代码示例来源:origin: eclipse/ditto

/**
 * Purge namespaces from some persistent storage.
 *
 * @param namespaceDescriptors storage-specific identifiers to describe namespaces.
 * @return source of any errors during the purge.
 */
default Source<List<Throwable>, NotUsed> purgeAll(final Collection<S> namespaceDescriptors) {
  return Source.from(namespaceDescriptors)
      .flatMapConcat(this::purge)
      .grouped(namespaceDescriptors.size())
      .map(errors -> errors.stream()
          .filter(Optional::isPresent)
          .map(Optional::get)
          .collect(Collectors.toList()));
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private Source<Success, NotUsed> createIndices(final String collectionName, final List<Index> indices) {
  if (indices.isEmpty()) {
    return Source.empty();
  }
  return Source.from(indices)
      .flatMapConcat(index -> createIndex(collectionName, index));
}

代码示例来源:origin: eclipse/ditto

private Source<Success, NotUsed> createIndices(final String collectionName, final List<Index> indices) {
  if (indices.isEmpty()) {
    return Source.empty();
  }
  return Source.from(indices)
      .flatMapConcat(index -> createIndex(collectionName, index));
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private Source<Success, NotUsed> dropIndices(final String collectionName, final List<String> indices) {
  if (indices.isEmpty()) {
    return Source.empty();
  }
  return Source.from(indices)
      .flatMapConcat(index -> dropIndex(collectionName, index));
}

代码示例来源:origin: eclipse/ditto

private Source<Success, NotUsed> dropIndices(final String collectionName, final List<String> indices) {
  if (indices.isEmpty()) {
    return Source.empty();
  }
  return Source.from(indices)
      .flatMapConcat(index -> dropIndex(collectionName, index));
}

代码示例来源:origin: eclipse/ditto

private CompletionStage<Done> createNonExistingIndices(final String collectionName,
    final List<Index> indices) {
  if (indices.isEmpty()) {
    LOGGER.warn("No indices are defined, thus no indices are created.");
    return CompletableFuture.completedFuture(Done.getInstance());
  }
  return indexOperations.getIndicesExceptDefaultIndex(collectionName)
      .flatMapConcat(
          existingIndices -> {
            LOGGER.info("Create non-existing indices: Existing indices are: {}", existingIndices);
            final List<Index> indicesToCreate = excludeIndices(indices, existingIndices);
            LOGGER.info("Indices to create are: {}", indicesToCreate);
            return createIndices(collectionName, indicesToCreate);
          })
      .runWith(Sink.ignore(), materializer);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private Source<Optional<Instant>, NotUsed> retrieveLastSuccessfulStreamEndAsync() {
  return Source.fromPublisher(lastSuccessfulSearchSyncCollection.find())
      .limit(1)
      .flatMapConcat(doc -> {
        final Date date = doc.getDate(FIELD_TIMESTAMP);
        final Instant timestamp = date.toInstant();
        LOGGER.debug("Returning last timestamp of search synchronization: <{}>.", timestamp);
        return Source.single(Optional.of(timestamp));
      })
      .orElse(Source.single(Optional.empty()));
}

代码示例来源:origin: eclipse/ditto

private CompletionStage<Done> dropUndefinedIndices(final String collectionName, final List<Index> definedIndices) {
  return getIndicesExceptDefaultIndex(collectionName)
      .flatMapConcat(existingIndices -> {
        LOGGER.info("Drop undefined indices - Existing indices are: {}", existingIndices);
        final List<String> indicesToDrop = getUndefinedIndexNames(existingIndices, definedIndices);
        LOGGER.info("Dropping undefined indices: {}", indicesToDrop);
        return dropIndices(collectionName, indicesToDrop);
      })
      .runWith(Sink.ignore(), materializer);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private CompletionStage<Done> createNonExistingIndices(final String collectionName,
    final List<Index> indices) {
  if (indices.isEmpty()) {
    LOGGER.warn("No indices are defined, thus no indices are created.");
    return CompletableFuture.completedFuture(Done.getInstance());
  }
  return indexOperations.getIndicesExceptDefaultIndex(collectionName)
      .flatMapConcat(
          existingIndices -> {
            LOGGER.info("Create non-existing indices: Existing indices are: {}", existingIndices);
            final List<Index> indicesToCreate = excludeIndices(indices, existingIndices);
            LOGGER.info("Indices to create are: {}", indicesToCreate);
            return createIndices(collectionName, indicesToCreate);
          })
      .runWith(Sink.ignore(), materializer);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private CompletionStage<Done> dropUndefinedIndices(final String collectionName, final List<Index> definedIndices) {
  return getIndicesExceptDefaultIndex(collectionName)
      .flatMapConcat(existingIndices -> {
        LOGGER.info("Drop undefined indices - Existing indices are: {}", existingIndices);
        final List<String> indicesToDrop = getUndefinedIndexNames(existingIndices, definedIndices);
        LOGGER.info("Dropping undefined indices: {}", indicesToDrop);
        return dropIndices(collectionName, indicesToDrop);
      })
      .runWith(Sink.ignore(), materializer);
}

代码示例来源:origin: eclipse/ditto

private Source<Optional<Instant>, NotUsed> retrieveLastSuccessfulStreamEndAsync() {
  return Source.fromPublisher(lastSuccessfulSearchSyncCollection.find())
      .limit(1)
      .flatMapConcat(doc -> {
        final Date date = doc.getDate(FIELD_TIMESTAMP);
        final Instant timestamp = date.toInstant();
        LOGGER.debug("Returning last timestamp of search synchronization: <{}>.", timestamp);
        return Source.single(Optional.of(timestamp));
      })
      .orElse(Source.single(Optional.empty()));
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private CompletionStage<Optional<Throwable>> generateStatusResponse() {
  final String id = UUID.randomUUID().toString();
  return Source.fromPublisher(collection.insertOne(new Document(ID_FIELD, id)))
      .flatMapConcat(s ->
          Source.fromPublisher(collection.find(eq(ID_FIELD, id))).flatMapConcat(r ->
              Source.fromPublisher(collection.deleteOne(eq(ID_FIELD, id)))
                  .map(DeleteResult::getDeletedCount)
          )
      )
      .runWith(Sink.seq(), materializer)
      .handle((result, error) -> {
        if (error != null) {
          return Optional.of(error);
        } else if (!Objects.equals(result, Collections.singletonList(1L))) {
          final String message = "Expect 1 document inserted and deleted. Found: " + result;
          return Optional.of(new IllegalStateException(message));
        } else {
          return Optional.empty();
        }
      });
}

代码示例来源:origin: eclipse/ditto

private CompletionStage<Optional<Throwable>> generateStatusResponse() {
  final String id = UUID.randomUUID().toString();
  return Source.fromPublisher(collection.insertOne(new Document(ID_FIELD, id)))
      .flatMapConcat(s ->
          Source.fromPublisher(collection.find(eq(ID_FIELD, id))).flatMapConcat(r ->
              Source.fromPublisher(collection.deleteOne(eq(ID_FIELD, id)))
                  .map(DeleteResult::getDeletedCount)
          )
      )
      .runWith(Sink.seq(), materializer)
      .handle((result, error) -> {
        if (error != null) {
          return Optional.of(error);
        } else if (!Objects.equals(result, Collections.singletonList(1L))) {
          final String message = "Expect 1 document inserted and deleted. Found: " + result;
          return Optional.of(new IllegalStateException(message));
        } else {
          return Optional.empty();
        }
      });
}

代码示例来源:origin: eclipse/ditto

private static Graph<SourceShape<WithSender>, NotUsed> keepResultAndLogErrors(final Object result) {
  if (result instanceof WithSender) {
    return Source.single((WithSender) result);
  } else if (result instanceof DittoRuntimeException) {
    return Source.single(result)
        .log("PreEnforcer replied DittoRuntimeException")
        .withAttributes(INFO_LEVEL)
        .flatMapConcat(x -> Source.empty());
  } else {
    return Source.single(result)
        .log("PreEnforcer encountered unexpected exception")
        .withAttributes(ERROR_LEVEL)
        .flatMapConcat(x -> Source.empty());
  }
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

/**
 * {@inheritDoc}
 */
@Override
public final Source<Boolean, NotUsed> updatePolicy(final Thing thing, final Enforcer policyEnforcer) {
  log.debug("Updating policy for Thing: <{}>", thing);
  final PolicyUpdate policyUpdate = PolicyUpdateFactory.createPolicyIndexUpdate(thing, policyEnforcer);
  return Source.fromPublisher(updatePolicy(thing, policyUpdate))
      .flatMapConcat(mapPolicyUpdateResult(policyUpdate))
      .recoverWithRetries(1, errorRecovery(getThingId(thing)));
}

代码示例来源:origin: eclipse/ditto

/**
 * {@inheritDoc}
 */
@Override
public final Source<Boolean, NotUsed> updatePolicy(final Thing thing, final Enforcer policyEnforcer) {
  log.debug("Updating policy for Thing: <{}>", thing);
  final PolicyUpdate policyUpdate = PolicyUpdateFactory.createPolicyIndexUpdate(thing, policyEnforcer);
  return Source.fromPublisher(updatePolicy(thing, policyUpdate))
      .flatMapConcat(mapPolicyUpdateResult(policyUpdate))
      .recoverWithRetries(1, errorRecovery(getThingId(thing)));
}

代码示例来源:origin: eclipse/ditto

private Source<Boolean, NotUsed> delete(final String thingId, final Bson filter, final Bson document) {
  return Source.fromPublisher(collection.updateOne(filter, document))
      .flatMapConcat(deleteResult -> {
        if (deleteResult.getMatchedCount() <= 0) {
          return Source.single(Boolean.FALSE);
        }
        final PolicyUpdate deletePolicyEntries = PolicyUpdateFactory.createDeleteThingUpdate(thingId);
        final Bson policyIndexRemoveFilter = deletePolicyEntries.getPolicyIndexRemoveFilter();
        return Source.fromPublisher(policiesCollection.deleteMany(policyIndexRemoveFilter))
            .map(r -> true);
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

private Source<Boolean, NotUsed> delete(final String thingId, final Bson filter, final Bson document) {
  return Source.fromPublisher(collection.updateOne(filter, document))
      .flatMapConcat(deleteResult -> {
        if (deleteResult.getMatchedCount() <= 0) {
          return Source.single(Boolean.FALSE);
        }
        final PolicyUpdate deletePolicyEntries = PolicyUpdateFactory.createDeleteThingUpdate(thingId);
        final Bson policyIndexRemoveFilter = deletePolicyEntries.getPolicyIndexRemoveFilter();
        return Source.fromPublisher(policiesCollection.deleteMany(policyIndexRemoveFilter))
            .map(r -> true);
      });
}

代码示例来源:origin: eclipse/ditto

getContext().become(hasNextBehavior(getSender()));
Source.fromIterator(typedElements::iterator)
    .flatMapConcat(this::mapEntity)
    .concat(Source.single(DOES_NOT_HAVE_NEXT_MSG))
    .mapAsync(1, msgToForward -> {

代码示例来源:origin: eclipse/ditto

private Route handleSudoCountThingsPerRequest(final RequestContext ctx, final SudoCountThings command) {
  final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
  Source.single(command)
      .to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
          HttpRequestActor.COMPLETE_MESSAGE))
      .run(materializer);
  final CompletionStage<HttpResponse> allThingsCountHttpResponse = Source.fromCompletionStage(httpResponseFuture)
      .flatMapConcat(httpResponse -> httpResponse.entity().getDataBytes())
      .fold(ByteString.empty(), ByteString::concat)
      .map(ByteString::utf8String)
      .map(Integer::valueOf)
      .map(count -> JsonObject.newBuilder().set("allThingsCount", count).build())
      .map(jsonObject -> HttpResponse.create()
          .withEntity(ContentTypes.APPLICATION_JSON, ByteString.fromString(jsonObject.toString()))
          .withStatus(HttpStatusCode.OK.toInt()))
      .runWith(Sink.head(), materializer);
  return completeWithFuture(allThingsCountHttpResponse);
}

相关文章