本文整理了Java中akka.stream.javadsl.Source
类的一些代码示例,展示了Source
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source
类的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
暂无
代码示例来源:origin: com.typesafe.play/play_2.12
public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
if (part instanceof Http.MultipartFormData.DataPart) {
Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
} else if (part instanceof Http.MultipartFormData.FilePart) {
Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
if (fp.ref instanceof Source) {
Source ref = (Source) fp.ref;
Option<String> ct = Option.apply(fp.getContentType());
return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
}
}
throw new UnsupportedOperationException("Unsupported Part Class");
});
return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}
代码示例来源:origin: com.typesafe.play/play_2.12
/**
* Set a Multipart Form url encoded body to this request saving it as a raw body.
*
* @param data the multipart-form parameters
* @param temporaryFileCreator the temporary file creator.
* @param mat a Akka Streams Materializer
* @return the modified builder
*/
public RequestBuilder bodyRaw(List<MultipartFormData.Part<Source<ByteString, ?>>> data, Files.TemporaryFileCreator temporaryFileCreator, Materializer mat) {
String boundary = MultipartFormatter.randomBoundary();
try {
ByteString materializedData = MultipartFormatter
.transform(Source.from(data), boundary)
.runWith(Sink.reduce(ByteString::concat), mat)
.toCompletableFuture()
.get();
play.api.mvc.RawBuffer buffer = new play.api.mvc.RawBuffer(materializedData.size(), temporaryFileCreator.asScala(), materializedData);
return body(new RequestBody(JavaParsers.toJavaRaw(buffer)), MultipartFormatter.boundaryToContentType(boundary));
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failure while materializing Multipart/Form Data", e);
}
}
代码示例来源:origin: com.typesafe.play/play_2.12
@Override
public Source<ByteString, ?> dataStream() {
return Source.<ByteString>single(data);
}
代码示例来源:origin: eclipse/ditto
Source.fromCompletionStage(PatternsCS.ask(chosenQueryActor, countThings, QUERY_ASK_TIMEOUT))
.flatMapConcat(query -> {
LogUtil.enhanceLogWithCorrelationId(log, correlationIdOpt);
queryParsingTimer.stop();
() -> searchPersistence.count((PolicyRestrictedSearchAggregation) query),
dittoHeaders)
.via(Flow.fromFunction(result -> {
databaseAccessTimer.stop();
return result;
}))
.map(count -> CountThingsResponse.of(count, dittoHeaders));
} else if (query instanceof Query) {
final StartedTimer databaseAccessTimer =
.via(Flow.fromFunction(result -> {
databaseAccessTimer.stop();
return result;
}))
.map(count -> CountThingsResponse.of(count, dittoHeaders));
} else if (query instanceof DittoRuntimeException) {
log.info("QueryActor responded with DittoRuntimeException: {}", query);
return Source.<Object>failed((Throwable) query);
} else {
log.error("Expected 'PolicyRestrictedSearchAggregation', but got: {}", query);
return Source.<Object>single(CountThingsResponse.of(-1, dittoHeaders));
.via(Flow.fromFunction(result -> {
countTimer.stop();
代码示例来源:origin: eclipse/ditto
private Route handleSudoCountThingsPerRequest(final RequestContext ctx, final SudoCountThings command) {
final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
Source.single(command)
.to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
HttpRequestActor.COMPLETE_MESSAGE))
.run(materializer);
final CompletionStage<HttpResponse> allThingsCountHttpResponse = Source.fromCompletionStage(httpResponseFuture)
.flatMapConcat(httpResponse -> httpResponse.entity().getDataBytes())
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.map(Integer::valueOf)
.map(count -> JsonObject.newBuilder().set("allThingsCount", count).build())
.map(jsonObject -> HttpResponse.create()
.withEntity(ContentTypes.APPLICATION_JSON, ByteString.fromString(jsonObject.toString()))
.withStatus(HttpStatusCode.OK.toInt()))
.runWith(Sink.head(), materializer);
return completeWithFuture(allThingsCountHttpResponse);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors
private CompletionStage<Boolean> updateThing(final Thing thing) {
final long currentSequenceNumber = thing.getRevision()
.map(ThingRevision::toLong)
.orElseThrow(() -> {
final String message = MessageFormat.format("The Thing <{0}> has no revision!", thingId);
return new IllegalArgumentException(message);
});
final StartedTimer timer =
DittoMetrics.expiringTimer(TRACE_THING_MODIFIED).tag(UPDATE_TYPE_TAG, "modified").build();
return circuitBreaker.callWithCircuitBreakerCS(
() -> searchUpdaterPersistence
.insertOrUpdate(thing, currentSequenceNumber, policyRevision)
.via(stopTimer(timer))
.runWith(Sink.last(), materializer));
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
private CompletionStage<Optional<Throwable>> generateStatusResponse() {
final String id = UUID.randomUUID().toString();
return Source.fromPublisher(collection.insertOne(new Document(ID_FIELD, id)))
.flatMapConcat(s ->
Source.fromPublisher(collection.find(eq(ID_FIELD, id))).flatMapConcat(r ->
Source.fromPublisher(collection.deleteOne(eq(ID_FIELD, id)))
.map(DeleteResult::getDeletedCount)
)
)
.runWith(Sink.seq(), materializer)
.handle((result, error) -> {
if (error != null) {
return Optional.of(error);
} else if (!Objects.equals(result, Collections.singletonList(1L))) {
final String message = "Expect 1 document inserted and deleted. Found: " + result;
return Optional.of(new IllegalStateException(message));
} else {
return Optional.empty();
}
});
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
private Source<Boolean, NotUsed> delete(final String thingId, final Bson filter, final Bson document) {
return Source.fromPublisher(collection.updateOne(filter, document))
.flatMapConcat(deleteResult -> {
if (deleteResult.getMatchedCount() <= 0) {
return Source.single(Boolean.FALSE);
}
final PolicyUpdate deletePolicyEntries = PolicyUpdateFactory.createDeleteThingUpdate(thingId);
final Bson policyIndexRemoveFilter = deletePolicyEntries.getPolicyIndexRemoveFilter();
return Source.fromPublisher(policiesCollection.deleteMany(policyIndexRemoveFilter))
.map(r -> true);
});
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
@Override
public Source<Long, NotUsed> count(final PolicyRestrictedSearchAggregation policyRestrictedSearchAggregation) {
checkNotNull(policyRestrictedSearchAggregation, "policy restricted aggregation");
final Source<Document, NotUsed> source = policyRestrictedSearchAggregation.execute(collection, maxQueryTime);
return source.map(doc -> doc.get(PersistenceConstants.COUNT_RESULT_NAME))
.map(countResult -> (Number) countResult)
.map(Number::longValue) // use Number.longValue() to support both Integer and Long values
.orElse(Source.<Long>single(0L))
.mapError(handleMongoExecutionTimeExceededException())
.log("count");
}
代码示例来源:origin: joshlong/reactive-spring-online-training
Publisher<HashTag> hashtags() {
return Source
.fromPublisher(this.tweets())
.map(Tweet::getHashTags)
.reduce((a, b) -> {
Set<HashTag> tags = new HashSet<>();
tags.addAll(a);
tags.addAll(b);
return tags;
})
.mapConcat(param -> param)
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), this.actorMaterializer);
}
代码示例来源:origin: eclipse/ditto
@Override
public Source<NotUsed, NotUsed> updateLastSuccessfulStreamEnd(final Instant timestamp) {
final Date mongoStorableDate = Date.from(timestamp);
final Document toStore = new Document()
.append(FIELD_TIMESTAMP, mongoStorableDate);
return Source.fromPublisher(lastSuccessfulSearchSyncCollection.insertOne(toStore))
.map(success -> {
LOGGER.debug("Successfully inserted timestamp for search synchronization: <{}>.", timestamp);
return NotUsed.getInstance();
});
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-akka
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.matchAny(message -> {
if (message instanceof WithDittoHeaders) {
LogUtil.enhanceLogWithCorrelationId(log, (WithDittoHeaders<?>) message);
}
log.debug("Received message: <{}>.", message);
final WithSender wrapped = WithSender.of(message, getSender());
Source.single(wrapped).runWith(messageHandler, materializer);
})
.build();
}
}
代码示例来源:origin: com.lightbend.lagom/lagom-javadsl-testkit
public <Event extends AggregateEvent<Event>> CompletionStage<Done> feed(Event e, Offset offset) {
AggregateEventTagger<Event> tag = e.aggregateTag();
List<Pair<ReadSideHandler<?>, Offset>> list = processors.get(tag.eventType());
if (list == null) {
throw new RuntimeException("No processor registered for Event " + tag.eventType().getCanonicalName());
}
List<CompletionStage<?>> stages = list.stream().map(pHandlerOffset -> {
@SuppressWarnings("unchecked") ReadSideHandler<Event> handler = (ReadSideHandler<Event>) pHandlerOffset.first();
Flow<Pair<Event, Offset>, Done, ?> flow = handler.handle();
return Source.single(Pair.create(e, offset)).via(flow).runWith(Sink.ignore(), materializer);
}
).collect(Collectors.toList());
return doAll(stages);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
private CompletionStage<Done> dropUndefinedIndices(final String collectionName, final List<Index> definedIndices) {
return getIndicesExceptDefaultIndex(collectionName)
.flatMapConcat(existingIndices -> {
LOGGER.info("Drop undefined indices - Existing indices are: {}", existingIndices);
final List<String> indicesToDrop = getUndefinedIndexNames(existingIndices, definedIndices);
LOGGER.info("Dropping undefined indices: {}", indicesToDrop);
return dropIndices(collectionName, indicesToDrop);
})
.runWith(Sink.ignore(), materializer);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence
private Source<Optional<Instant>, NotUsed> retrieveLastSuccessfulStreamEndAsync() {
return Source.fromPublisher(lastSuccessfulSearchSyncCollection.find())
.limit(1)
.flatMapConcat(doc -> {
final Date date = doc.getDate(FIELD_TIMESTAMP);
final Instant timestamp = date.toInstant();
LOGGER.debug("Returning last timestamp of search synchronization: <{}>.", timestamp);
return Source.single(Optional.of(timestamp));
})
.orElse(Source.single(Optional.empty()));
}
代码示例来源:origin: apptik/RHub
@Override
@SuppressWarnings("unchecked")
protected Publisher hide(Processor processor) {
return (Publisher) Source
.fromPublisher(processor)
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
}
代码示例来源:origin: eclipse/ditto
private static Graph<SourceShape<WithSender>, NotUsed> keepResultAndLogErrors(final Object result) {
if (result instanceof WithSender) {
return Source.single((WithSender) result);
} else if (result instanceof DittoRuntimeException) {
return Source.single(result)
.log("PreEnforcer replied DittoRuntimeException")
.withAttributes(INFO_LEVEL)
.flatMapConcat(x -> Source.empty());
} else {
return Source.single(result)
.log("PreEnforcer encountered unexpected exception")
.withAttributes(ERROR_LEVEL)
.flatMapConcat(x -> Source.empty());
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String[] args) throws Exception {
Config cfg = ConfigFactory.parseResources(AkkaRange.class, "/akka-streams.conf").resolve();
ActorSystem actorSystem = ActorSystem.create("sys", cfg);
ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
Source<Integer, NotUsed> source = Source.repeat(1)
.map(v -> v + 1);
Publisher<Integer> p = source.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
p.subscribe(println());
Thread.sleep(1000);
actorSystem.terminate();
}
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence
@Override
public final Source<Set<String>, NotUsed> getThingIdsForPolicy(final String policyId) {
log.debug("Retrieving Thing ids for policy: <{}>", policyId);
final Bson filter = eq(FIELD_POLICY_ID, policyId);
return Source.fromPublisher(collection.find(filter)
.projection(new BsonDocument(FIELD_ID, new BsonInt32(1))))
.map(doc -> doc.getString(FIELD_ID))
.fold(new HashSet<>(), (set, id) -> {
set.add(id);
return set;
});
}
代码示例来源:origin: com.typesafe.play/play_2.11
/**
* Convert the given source of ByteStrings to a chunked entity.
*
* @param data The source.
* @param contentType The optional content type.
* @return The ByteStrings.
*/
public static final HttpEntity chunked(Source<ByteString, ?> data, Optional<String> contentType) {
return new Chunked(data.map(HttpChunk.Chunk::new), contentType);
}
内容来源于网络,如有侵权,请联系作者删除!