本文整理了Java中akka.stream.javadsl.Source.flatMapConcat()
方法的一些代码示例,展示了Source.flatMapConcat()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.flatMapConcat()
方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:flatMapConcat
暂无
代码示例来源:origin: eclipse/ditto
/**
* Purge namespaces from some persistent storage.
*
* @param namespaceDescriptors storage-specific identifiers to describe namespaces.
* @return source of any errors during the purge.
*/
default Source<List<Throwable>, NotUsed> purgeAll(final Collection<S> namespaceDescriptors) {
return Source.from(namespaceDescriptors)
.flatMapConcat(this::purge)
.grouped(namespaceDescriptors.size())
.map(errors -> errors.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList()));
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
private Source<Success, NotUsed> createIndices(final String collectionName, final List<Index> indices) {
if (indices.isEmpty()) {
return Source.empty();
}
return Source.from(indices)
.flatMapConcat(index -> createIndex(collectionName, index));
}
代码示例来源:origin: eclipse/ditto
private Source<Success, NotUsed> createIndices(final String collectionName, final List<Index> indices) {
if (indices.isEmpty()) {
return Source.empty();
}
return Source.from(indices)
.flatMapConcat(index -> createIndex(collectionName, index));
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
private Source<Success, NotUsed> dropIndices(final String collectionName, final List<String> indices) {
if (indices.isEmpty()) {
return Source.empty();
}
return Source.from(indices)
.flatMapConcat(index -> dropIndex(collectionName, index));
}
代码示例来源:origin: eclipse/ditto
private Source<Success, NotUsed> dropIndices(final String collectionName, final List<String> indices) {
if (indices.isEmpty()) {
return Source.empty();
}
return Source.from(indices)
.flatMapConcat(index -> dropIndex(collectionName, index));
}
代码示例来源:origin: eclipse/ditto
private CompletionStage<Done> createNonExistingIndices(final String collectionName,
final List<Index> indices) {
if (indices.isEmpty()) {
LOGGER.warn("No indices are defined, thus no indices are created.");
return CompletableFuture.completedFuture(Done.getInstance());
}
return indexOperations.getIndicesExceptDefaultIndex(collectionName)
.flatMapConcat(
existingIndices -> {
LOGGER.info("Create non-existing indices: Existing indices are: {}", existingIndices);
final List<Index> indicesToCreate = excludeIndices(indices, existingIndices);
LOGGER.info("Indices to create are: {}", indicesToCreate);
return createIndices(collectionName, indicesToCreate);
})
.runWith(Sink.ignore(), materializer);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
private Source<Optional<Instant>, NotUsed> retrieveLastSuccessfulStreamEndAsync() {
return Source.fromPublisher(lastSuccessfulSearchSyncCollection.find())
.limit(1)
.flatMapConcat(doc -> {
final Date date = doc.getDate(FIELD_TIMESTAMP);
final Instant timestamp = date.toInstant();
LOGGER.debug("Returning last timestamp of search synchronization: <{}>.", timestamp);
return Source.single(Optional.of(timestamp));
})
.orElse(Source.single(Optional.empty()));
}
代码示例来源:origin: eclipse/ditto
private CompletionStage<Done> dropUndefinedIndices(final String collectionName, final List<Index> definedIndices) {
return getIndicesExceptDefaultIndex(collectionName)
.flatMapConcat(existingIndices -> {
LOGGER.info("Drop undefined indices - Existing indices are: {}", existingIndices);
final List<String> indicesToDrop = getUndefinedIndexNames(existingIndices, definedIndices);
LOGGER.info("Dropping undefined indices: {}", indicesToDrop);
return dropIndices(collectionName, indicesToDrop);
})
.runWith(Sink.ignore(), materializer);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
private CompletionStage<Done> createNonExistingIndices(final String collectionName,
final List<Index> indices) {
if (indices.isEmpty()) {
LOGGER.warn("No indices are defined, thus no indices are created.");
return CompletableFuture.completedFuture(Done.getInstance());
}
return indexOperations.getIndicesExceptDefaultIndex(collectionName)
.flatMapConcat(
existingIndices -> {
LOGGER.info("Create non-existing indices: Existing indices are: {}", existingIndices);
final List<Index> indicesToCreate = excludeIndices(indices, existingIndices);
LOGGER.info("Indices to create are: {}", indicesToCreate);
return createIndices(collectionName, indicesToCreate);
})
.runWith(Sink.ignore(), materializer);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
private CompletionStage<Done> dropUndefinedIndices(final String collectionName, final List<Index> definedIndices) {
return getIndicesExceptDefaultIndex(collectionName)
.flatMapConcat(existingIndices -> {
LOGGER.info("Drop undefined indices - Existing indices are: {}", existingIndices);
final List<String> indicesToDrop = getUndefinedIndexNames(existingIndices, definedIndices);
LOGGER.info("Dropping undefined indices: {}", indicesToDrop);
return dropIndices(collectionName, indicesToDrop);
})
.runWith(Sink.ignore(), materializer);
}
代码示例来源:origin: eclipse/ditto
private Source<Optional<Instant>, NotUsed> retrieveLastSuccessfulStreamEndAsync() {
return Source.fromPublisher(lastSuccessfulSearchSyncCollection.find())
.limit(1)
.flatMapConcat(doc -> {
final Date date = doc.getDate(FIELD_TIMESTAMP);
final Instant timestamp = date.toInstant();
LOGGER.debug("Returning last timestamp of search synchronization: <{}>.", timestamp);
return Source.single(Optional.of(timestamp));
})
.orElse(Source.single(Optional.empty()));
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
private CompletionStage<Optional<Throwable>> generateStatusResponse() {
final String id = UUID.randomUUID().toString();
return Source.fromPublisher(collection.insertOne(new Document(ID_FIELD, id)))
.flatMapConcat(s ->
Source.fromPublisher(collection.find(eq(ID_FIELD, id))).flatMapConcat(r ->
Source.fromPublisher(collection.deleteOne(eq(ID_FIELD, id)))
.map(DeleteResult::getDeletedCount)
)
)
.runWith(Sink.seq(), materializer)
.handle((result, error) -> {
if (error != null) {
return Optional.of(error);
} else if (!Objects.equals(result, Collections.singletonList(1L))) {
final String message = "Expect 1 document inserted and deleted. Found: " + result;
return Optional.of(new IllegalStateException(message));
} else {
return Optional.empty();
}
});
}
代码示例来源:origin: eclipse/ditto
private CompletionStage<Optional<Throwable>> generateStatusResponse() {
final String id = UUID.randomUUID().toString();
return Source.fromPublisher(collection.insertOne(new Document(ID_FIELD, id)))
.flatMapConcat(s ->
Source.fromPublisher(collection.find(eq(ID_FIELD, id))).flatMapConcat(r ->
Source.fromPublisher(collection.deleteOne(eq(ID_FIELD, id)))
.map(DeleteResult::getDeletedCount)
)
)
.runWith(Sink.seq(), materializer)
.handle((result, error) -> {
if (error != null) {
return Optional.of(error);
} else if (!Objects.equals(result, Collections.singletonList(1L))) {
final String message = "Expect 1 document inserted and deleted. Found: " + result;
return Optional.of(new IllegalStateException(message));
} else {
return Optional.empty();
}
});
}
代码示例来源:origin: eclipse/ditto
private static Graph<SourceShape<WithSender>, NotUsed> keepResultAndLogErrors(final Object result) {
if (result instanceof WithSender) {
return Source.single((WithSender) result);
} else if (result instanceof DittoRuntimeException) {
return Source.single(result)
.log("PreEnforcer replied DittoRuntimeException")
.withAttributes(INFO_LEVEL)
.flatMapConcat(x -> Source.empty());
} else {
return Source.single(result)
.log("PreEnforcer encountered unexpected exception")
.withAttributes(ERROR_LEVEL)
.flatMapConcat(x -> Source.empty());
}
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
/**
* {@inheritDoc}
*/
@Override
public final Source<Boolean, NotUsed> updatePolicy(final Thing thing, final Enforcer policyEnforcer) {
log.debug("Updating policy for Thing: <{}>", thing);
final PolicyUpdate policyUpdate = PolicyUpdateFactory.createPolicyIndexUpdate(thing, policyEnforcer);
return Source.fromPublisher(updatePolicy(thing, policyUpdate))
.flatMapConcat(mapPolicyUpdateResult(policyUpdate))
.recoverWithRetries(1, errorRecovery(getThingId(thing)));
}
代码示例来源:origin: eclipse/ditto
/**
* {@inheritDoc}
*/
@Override
public final Source<Boolean, NotUsed> updatePolicy(final Thing thing, final Enforcer policyEnforcer) {
log.debug("Updating policy for Thing: <{}>", thing);
final PolicyUpdate policyUpdate = PolicyUpdateFactory.createPolicyIndexUpdate(thing, policyEnforcer);
return Source.fromPublisher(updatePolicy(thing, policyUpdate))
.flatMapConcat(mapPolicyUpdateResult(policyUpdate))
.recoverWithRetries(1, errorRecovery(getThingId(thing)));
}
代码示例来源:origin: eclipse/ditto
private Source<Boolean, NotUsed> delete(final String thingId, final Bson filter, final Bson document) {
return Source.fromPublisher(collection.updateOne(filter, document))
.flatMapConcat(deleteResult -> {
if (deleteResult.getMatchedCount() <= 0) {
return Source.single(Boolean.FALSE);
}
final PolicyUpdate deletePolicyEntries = PolicyUpdateFactory.createDeleteThingUpdate(thingId);
final Bson policyIndexRemoveFilter = deletePolicyEntries.getPolicyIndexRemoveFilter();
return Source.fromPublisher(policiesCollection.deleteMany(policyIndexRemoveFilter))
.map(r -> true);
});
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
private Source<Boolean, NotUsed> delete(final String thingId, final Bson filter, final Bson document) {
return Source.fromPublisher(collection.updateOne(filter, document))
.flatMapConcat(deleteResult -> {
if (deleteResult.getMatchedCount() <= 0) {
return Source.single(Boolean.FALSE);
}
final PolicyUpdate deletePolicyEntries = PolicyUpdateFactory.createDeleteThingUpdate(thingId);
final Bson policyIndexRemoveFilter = deletePolicyEntries.getPolicyIndexRemoveFilter();
return Source.fromPublisher(policiesCollection.deleteMany(policyIndexRemoveFilter))
.map(r -> true);
});
}
代码示例来源:origin: eclipse/ditto
getContext().become(hasNextBehavior(getSender()));
Source.fromIterator(typedElements::iterator)
.flatMapConcat(this::mapEntity)
.concat(Source.single(DOES_NOT_HAVE_NEXT_MSG))
.mapAsync(1, msgToForward -> {
代码示例来源:origin: eclipse/ditto
private Route handleSudoCountThingsPerRequest(final RequestContext ctx, final SudoCountThings command) {
final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
Source.single(command)
.to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
HttpRequestActor.COMPLETE_MESSAGE))
.run(materializer);
final CompletionStage<HttpResponse> allThingsCountHttpResponse = Source.fromCompletionStage(httpResponseFuture)
.flatMapConcat(httpResponse -> httpResponse.entity().getDataBytes())
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.map(Integer::valueOf)
.map(count -> JsonObject.newBuilder().set("allThingsCount", count).build())
.map(jsonObject -> HttpResponse.create()
.withEntity(ContentTypes.APPLICATION_JSON, ByteString.fromString(jsonObject.toString()))
.withStatus(HttpStatusCode.OK.toInt()))
.runWith(Sink.head(), materializer);
return completeWithFuture(allThingsCountHttpResponse);
}
内容来源于网络,如有侵权,请联系作者删除!