本文整理了Java中akka.stream.javadsl.Source.fold()
方法的一些代码示例,展示了Source.fold()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.fold()
方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:fold
暂无
代码示例来源:origin: eclipse/ditto
/**
* Gets all indices defined on the given collection, including the default index defined on "_id".
*
* @param collectionName the name of the collection.
* @return a source which emits the list of found indices.
*/
public Source<List<Index>, NotUsed> getIndices(final String collectionName) {
return Source.fromPublisher(getCollection(collectionName).listIndexes())
.map(Index::indexInfoOf)
.fold(new ArrayList<Index>(), (aggregate, element) -> {
aggregate.add(element);
return aggregate;
});
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
/**
* Gets all indices defined on the given collection, including the default index defined on "_id".
*
* @param collectionName the name of the collection.
* @return a source which emits the list of found indices.
*/
public Source<List<Index>, NotUsed> getIndices(final String collectionName) {
return Source.fromPublisher(getCollection(collectionName).listIndexes())
.map(Index::indexInfoOf)
.fold(new ArrayList<Index>(), (aggregate, element) -> {
aggregate.add(element);
return aggregate;
});
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
@Override
public Source<SearchNamespaceReportResult, NotUsed> generateNamespaceCountReport() {
final AggregatePublisher<Document> aggregatePublisher = collection.aggregate(
Collections.singletonList(
new Document("$group",
new Document(PersistenceConstants.FIELD_ID, "$_namespace")
.append(PersistenceConstants.FIELD_COUNT, new Document("$sum", 1))
)
)
);
return Source.fromPublisher(aggregatePublisher)
.map(document -> {
final String namespace = (document.get(PersistenceConstants.FIELD_ID) != null)
? document.get(PersistenceConstants.FIELD_ID).toString()
: "NOT_MIGRATED";
final long count = Long.parseLong(document.get(PersistenceConstants.FIELD_COUNT).toString());
return new SearchNamespaceResultEntry(namespace, count);
})
.fold(new ArrayList<SearchNamespaceResultEntry>(), (list, entry) -> {
list.add(entry);
return list;
})
.<SearchNamespaceReportResult>map(SearchNamespaceReportResult::new);
}
代码示例来源:origin: eclipse/ditto
@Override
public Source<SearchNamespaceReportResult, NotUsed> generateNamespaceCountReport() {
final AggregatePublisher<Document> aggregatePublisher = collection.aggregate(
Collections.singletonList(
new Document("$group",
new Document(PersistenceConstants.FIELD_ID, "$_namespace")
.append(PersistenceConstants.FIELD_COUNT, new Document("$sum", 1))
)
)
);
return Source.fromPublisher(aggregatePublisher)
.map(document -> {
final String namespace = (document.get(PersistenceConstants.FIELD_ID) != null)
? document.get(PersistenceConstants.FIELD_ID).toString()
: "NOT_MIGRATED";
final long count = Long.parseLong(document.get(PersistenceConstants.FIELD_COUNT).toString());
return new SearchNamespaceResultEntry(namespace, count);
})
.fold(new ArrayList<SearchNamespaceResultEntry>(), (list, entry) -> {
list.add(entry);
return list;
})
.<SearchNamespaceReportResult>map(SearchNamespaceReportResult::new);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-gateway-util
/**
* Unfolds the entity into a string using akka streams.
*
* @param entity the entity to unfold.
* @param materializer the materializer to run the unfold stream.
* @return the string representation of the entity.
*/
public static String entityToString(final HttpEntity entity, final Materializer materializer) {
requireNonNull(entity);
requireNonNull(materializer);
if (entity.isKnownEmpty()) {
return "";
}
try {
return entity.getDataBytes()
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.runWith(Sink.head(), materializer)
.toCompletableFuture()
.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException("Failed to read entity.", e);
}
}
代码示例来源:origin: eclipse/ditto
/**
* Unfolds the entity into a string using akka streams.
*
* @param entity the entity to unfold.
* @param materializer the materializer to run the unfold stream.
* @return the string representation of the entity.
*/
public static String entityToString(final HttpEntity entity, final Materializer materializer) {
requireNonNull(entity);
requireNonNull(materializer);
if (entity.isKnownEmpty()) {
return "";
}
try {
return entity.getDataBytes()
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.runWith(Sink.head(), materializer)
.toCompletableFuture()
.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException("Failed to read entity.", e);
}
}
代码示例来源:origin: eclipse/ditto
private CompletableFuture<JsonArray> mapResponseToJsonArray(final HttpResponse response) {
final CompletionStage<JsonObject> body =
response.entity().getDataBytes().fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.map(JsonFactory::readFrom)
.map(JsonValue::asObject)
.runWith(Sink.head(), httpClient.getActorMaterializer());
final JsonPointer keysPointer = JsonPointer.of("keys");
return body.toCompletableFuture()
.thenApply(jsonObject -> jsonObject.getValue(keysPointer).map(JsonValue::asArray)
.orElseThrow(() -> new JsonMissingFieldException(keysPointer)))
.exceptionally(t -> {
throw new IllegalStateException("Failed to extract public keys from JSON response: " + body, t);
});
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
@Override
public final Source<Set<String>, NotUsed> getThingIdsForPolicy(final String policyId) {
log.debug("Retrieving Thing ids for policy: <{}>", policyId);
final Bson filter = eq(FIELD_POLICY_ID, policyId);
return Source.fromPublisher(collection.find(filter)
.projection(new BsonDocument(FIELD_ID, new BsonInt32(1))))
.map(doc -> doc.getString(FIELD_ID))
.fold(new HashSet<>(), (set, id) -> {
set.add(id);
return set;
});
}
代码示例来源:origin: eclipse/ditto
@Override
public final Source<Set<String>, NotUsed> getThingIdsForPolicy(final String policyId) {
log.debug("Retrieving Thing ids for policy: <{}>", policyId);
final Bson filter = eq(FIELD_POLICY_ID, policyId);
return Source.fromPublisher(collection.find(filter)
.projection(new BsonDocument(FIELD_ID, new BsonInt32(1))))
.map(doc -> doc.getString(FIELD_ID))
.fold(new HashSet<>(), (set, id) -> {
set.add(id);
return set;
});
}
代码示例来源:origin: eclipse/ditto
private Route handleDevOpsPerRequest(final RequestContext ctx,
final Source<ByteString, ?> payloadSource,
final Function<String, DevOpsCommand> requestJsonToCommandFunction) {
final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
payloadSource
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.map(requestJsonToCommandFunction)
.to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
HttpRequestActor.COMPLETE_MESSAGE))
.run(materializer);
return completeWithFuture(httpResponseFuture);
}
代码示例来源:origin: eclipse/ditto
@Override
public Source<ResultList<String>, NotUsed> findAll(final PolicyRestrictedSearchAggregation aggregation) {
checkNotNull(aggregation, "aggregation");
final Source<Document, NotUsed> source = aggregation.execute(collection, maxQueryTime);
return source.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
.fold(new ArrayList<String>(), (list, id) -> {
list.add(id);
return list;
})
.map(resultsPlus0ne -> toResultList(resultsPlus0ne, aggregation.getSkip(), aggregation.getLimit()))
.mapError(handleMongoExecutionTimeExceededException())
.log("findAll");
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
@Override
public Source<ResultList<String>, NotUsed> findAll(final PolicyRestrictedSearchAggregation aggregation) {
checkNotNull(aggregation, "aggregation");
final Source<Document, NotUsed> source = aggregation.execute(collection, maxQueryTime);
return source.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
.fold(new ArrayList<String>(), (list, id) -> {
list.add(id);
return list;
})
.map(resultsPlus0ne -> toResultList(resultsPlus0ne, aggregation.getSkip(), aggregation.getLimit()))
.mapError(handleMongoExecutionTimeExceededException())
.log("findAll");
}
代码示例来源:origin: eclipse/ditto
private Route handleMessage(final RequestContext ctx, final Source<ByteString, Object> payloadSource,
final Function<ByteBuffer, MessageCommand<?, ?>> requestPayloadToCommandFunction) {
final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
payloadSource.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::toArray)
.map(ByteBuffer::wrap)
.map(requestPayloadToCommandFunction)
.to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
HttpRequestActor.COMPLETE_MESSAGE))
.run(materializer);
return completeWithFuture(preprocessResponse(httpResponseFuture));
}
代码示例来源:origin: eclipse/ditto
@Override
public Source<ResultList<String>, NotUsed> findAll(final Query query) {
checkNotNull(query, "query");
final BsonDocument queryFilter = getMongoFilter(query);
if (log.isDebugEnabled()) {
log.debug("findAll with query filter <{}>.", queryFilter);
}
final Bson filter = and(filterNotDeleted(), queryFilter);
final Optional<Bson> sortOptions = Optional.of(getMongoSort(query));
final int limit = query.getLimit();
final int skip = query.getSkip();
final Bson projection = new Document(PersistenceConstants.FIELD_ID, 1);
return Source.fromPublisher(collection.find(filter, Document.class)
.sort(sortOptions.orElse(null))
.limit(limit + 1)
.skip(skip)
.projection(projection)
.maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS)
)
.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
.fold(new ArrayList<String>(), (list, id) -> {
list.add(id);
return list;
})
.map(resultsPlus0ne -> toResultList(resultsPlus0ne, skip, limit))
.mapError(handleMongoExecutionTimeExceededException())
.log("findAll");
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
@Override
public Source<ResultList<String>, NotUsed> findAll(final Query query) {
checkNotNull(query, "query");
final BsonDocument queryFilter = getMongoFilter(query);
if (log.isDebugEnabled()) {
log.debug("findAll with query filter <{}>.", queryFilter);
}
final Bson filter = and(filterNotDeleted(), queryFilter);
final Optional<Bson> sortOptions = Optional.of(getMongoSort(query));
final int limit = query.getLimit();
final int skip = query.getSkip();
final Bson projection = new Document(PersistenceConstants.FIELD_ID, 1);
return Source.fromPublisher(collection.find(filter, Document.class)
.sort(sortOptions.orElse(null))
.limit(limit + 1)
.skip(skip)
.projection(projection)
.maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS)
)
.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
.fold(new ArrayList<String>(), (list, id) -> {
list.add(id);
return list;
})
.map(resultsPlus0ne -> toResultList(resultsPlus0ne, skip, limit))
.mapError(handleMongoExecutionTimeExceededException())
.log("findAll");
}
代码示例来源:origin: eclipse/ditto
private Route handleSudoCountThingsPerRequest(final RequestContext ctx, final SudoCountThings command) {
final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
Source.single(command)
.to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
HttpRequestActor.COMPLETE_MESSAGE))
.run(materializer);
final CompletionStage<HttpResponse> allThingsCountHttpResponse = Source.fromCompletionStage(httpResponseFuture)
.flatMapConcat(httpResponse -> httpResponse.entity().getDataBytes())
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.map(Integer::valueOf)
.map(count -> JsonObject.newBuilder().set("allThingsCount", count).build())
.map(jsonObject -> HttpResponse.create()
.withEntity(ContentTypes.APPLICATION_JSON, ByteString.fromString(jsonObject.toString()))
.withStatus(HttpStatusCode.OK.toInt()))
.runWith(Sink.head(), materializer);
return completeWithFuture(allThingsCountHttpResponse);
}
代码示例来源:origin: eclipse/ditto
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.map(requestJsonToCommandFunction)
final InputStream inputStream = response.entity()
.getDataBytes()
.fold(ByteString.empty(), ByteString::concat)
.runWith(StreamConverters.asInputStream(), materializer);
final JsonValue jsonValue = JsonFactory.readFrom(new InputStreamReader(inputStream));
代码示例来源:origin: eclipse/ditto
private Sink<Message, NotUsed> createSink(final Integer version, final String connectionCorrelationId,
final AuthorizationContext connectionAuthContext, final DittoHeaders additionalHeaders,
final ProtocolAdapter adapter) {
return Flow.<Message>create()
.filter(Message::isText)
.map(Message::asTextMessage)
.map(textMsg -> {
if (textMsg.isStrict()) {
return Source.single(textMsg.getStrictText());
} else {
return textMsg.getStreamedText();
}
})
.flatMapConcat(textMsg -> textMsg.<String>fold("", (str1, str2) -> str1 + str2))
.via(Flow.fromFunction(result -> {
LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger ->
logger.debug("Received incoming WebSocket message: {}", result));
return result;
}))
.withAttributes(Attributes.createLogLevels(Logging.DebugLevel(), Logging.DebugLevel(),
Logging.WarningLevel()))
.filter(strictText -> processProtocolMessage(connectionAuthContext, connectionCorrelationId,
strictText))
.map(buildSignal(version, connectionCorrelationId, connectionAuthContext, additionalHeaders, adapter))
.to(Sink.actorSubscriber(
CommandSubscriber.props(streamingActor, subscriberBackpressureQueueSize, eventStream)));
}
内容来源于网络,如有侵权,请联系作者删除!