本文整理了Java中akka.stream.javadsl.Source.via()
方法的一些代码示例,展示了Source.via()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.via()
方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:via
暂无
代码示例来源:origin: com.typesafe.play/play_2.11
public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
if (part instanceof Http.MultipartFormData.DataPart) {
Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
} else if (part instanceof Http.MultipartFormData.FilePart) {
Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
if (fp.ref instanceof Source) {
Source ref = (Source) fp.ref;
Option<String> ct = Option.apply(fp.getContentType());
return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
}
}
throw new UnsupportedOperationException("Unsupported Part Class");
});
return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}
代码示例来源:origin: com.typesafe.play/play
public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
if (part instanceof Http.MultipartFormData.DataPart) {
Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
} else if (part instanceof Http.MultipartFormData.FilePart) {
Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
if (fp.ref instanceof Source) {
Source ref = (Source) fp.ref;
Option<String> ct = Option.apply(fp.getContentType());
return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
}
}
throw new UnsupportedOperationException("Unsupported Part Class");
});
return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}
代码示例来源:origin: com.typesafe.play/play_2.12
public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
if (part instanceof Http.MultipartFormData.DataPart) {
Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
} else if (part instanceof Http.MultipartFormData.FilePart) {
Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
if (fp.ref instanceof Source) {
Source ref = (Source) fp.ref;
Option<String> ct = Option.apply(fp.getContentType());
return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
}
}
throw new UnsupportedOperationException("Unsupported Part Class");
});
return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}
代码示例来源:origin: eclipse/ditto
private <T> Source<T, NotUsed> processSearchPersistenceResult(final Supplier<Source<T, NotUsed>> resultSupplier,
final DittoHeaders dittoHeaders) {
final Source<T, NotUsed> source = resultSupplier.get();
final Flow<T, T, NotUsed> logAndFinishPersistenceSegmentFlow =
Flow.fromFunction(result -> {
// we know that the source provides exactly one ResultList
LogUtil.enhanceLogWithCorrelationId(log, dittoHeaders.getCorrelationId());
log.debug("Persistence returned: {}", result);
return result;
});
return source.<T, NotUsed>via(logAndFinishPersistenceSegmentFlow);
}
代码示例来源:origin: com.lightbend.lagom/lagom-javadsl-testkit
public <Event extends AggregateEvent<Event>> CompletionStage<Done> feed(Event e, Offset offset) {
AggregateEventTagger<Event> tag = e.aggregateTag();
List<Pair<ReadSideHandler<?>, Offset>> list = processors.get(tag.eventType());
if (list == null) {
throw new RuntimeException("No processor registered for Event " + tag.eventType().getCanonicalName());
}
List<CompletionStage<?>> stages = list.stream().map(pHandlerOffset -> {
@SuppressWarnings("unchecked") ReadSideHandler<Event> handler = (ReadSideHandler<Event>) pHandlerOffset.first();
Flow<Pair<Event, Offset>, Done, ?> flow = handler.handle();
return Source.single(Pair.create(e, offset)).via(flow).runWith(Sink.ignore(), materializer);
}
).collect(Collectors.toList());
return doAll(stages);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors
private CompletionStage<Boolean> updateThing(final Thing thing) {
final long currentSequenceNumber = thing.getRevision()
.map(ThingRevision::toLong)
.orElseThrow(() -> {
final String message = MessageFormat.format("The Thing <{0}> has no revision!", thingId);
return new IllegalArgumentException(message);
});
final StartedTimer timer =
DittoMetrics.expiringTimer(TRACE_THING_MODIFIED).tag(UPDATE_TYPE_TAG, "modified").build();
return circuitBreaker.callWithCircuitBreakerCS(
() -> searchUpdaterPersistence
.insertOrUpdate(thing, currentSequenceNumber, policyRevision)
.via(stopTimer(timer))
.runWith(Sink.last(), materializer));
}
代码示例来源:origin: eclipse/ditto
private CompletionStage<Boolean> updateThing(final Thing thing) {
final long currentSequenceNumber = thing.getRevision()
.map(ThingRevision::toLong)
.orElseThrow(() -> {
final String message = MessageFormat.format("The Thing <{0}> has no revision!", thingId);
return new IllegalArgumentException(message);
});
final StartedTimer timer =
DittoMetrics.expiringTimer(TRACE_THING_MODIFIED).tag(UPDATE_TYPE_TAG, "modified").build();
return circuitBreaker.callWithCircuitBreakerCS(
() -> searchUpdaterPersistence
.insertOrUpdate(thing, currentSequenceNumber, policyRevision)
.via(stopTimer(timer))
.runWith(Sink.last(), materializer));
}
代码示例来源:origin: com.lightbend.akka/akka-stream-alpakka-file
/**
* Java API: Read the entire contents of a file as text lines, and then when the end is reached,
* keep reading newly appended data. Like the unix command `tail -f`.
*
* <p>If a line is longer than `maxChunkSize` the stream will fail.
*
* <p>Aborting the stage can be done by combining with a [[akka.stream.KillSwitch]]
*
* @param path a file path to tail
* @param maxLineSize The max emitted size of the `ByteString`s
* @param pollingInterval When the end has been reached, look for new content with this interval
* @param lf The character or characters used as line separator
* @param charset The charset of the file
*/
public static Source<String, NotUsed> createLines(
Path path, int maxLineSize, FiniteDuration pollingInterval, String lf, Charset charset) {
return create(path, maxLineSize, 0, pollingInterval)
.via(Framing.delimiter(ByteString.fromString(lf, charset.name()), maxLineSize))
.map(bytes -> bytes.decodeString(charset));
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors
private void persistThingEvents(final List<ThingEvent> thingEvents, final long targetRevision) {
log.debug("Executing bulk write operation with <{}> updates.", thingEvents.size());
if (!thingEvents.isEmpty()) {
transactionActive = true;
Kamon.histogram(COUNT_THING_BULK_UPDATES_PER_BULK).record(thingEvents.size());
final StartedTimer bulkUpdate =
DittoMetrics.expiringTimer(TRACE_THING_BULK_UPDATE).tag(UPDATE_TYPE_TAG, "bulkUpdate").build();
circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
.executeCombinedWrites(thingId, thingEvents, policyEnforcer, targetRevision)
.via(stopTimer(bulkUpdate))
.runWith(Sink.last(), materializer)
.whenComplete(this::processWriteResult))
.exceptionally(t -> {
log.error(t, "There occurred an error while processing a write operation within the"
+ " circuit breaker for thing <{}>.", thingId);
//e.g. in case of a circuit breaker timeout, the running transaction must be stopped
processWriteResult(false, t);
return null;
});
}
}
代码示例来源:origin: eclipse/ditto
private void persistThingEvents(final List<ThingEvent> thingEvents, final long targetRevision) {
log.debug("Executing bulk write operation with <{}> updates.", thingEvents.size());
if (!thingEvents.isEmpty()) {
transactionActive = true;
Kamon.histogram(COUNT_THING_BULK_UPDATES_PER_BULK).record(thingEvents.size());
final StartedTimer bulkUpdate =
DittoMetrics.expiringTimer(TRACE_THING_BULK_UPDATE).tag(UPDATE_TYPE_TAG, "bulkUpdate").build();
circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
.executeCombinedWrites(thingId, thingEvents, policyEnforcer, targetRevision)
.via(stopTimer(bulkUpdate))
.runWith(Sink.last(), materializer)
.whenComplete(this::processWriteResult))
.exceptionally(t -> {
log.error(t, "There occurred an error while processing a write operation within the"
+ " circuit breaker for thing <{}>.", thingId);
//e.g. in case of a circuit breaker timeout, the running transaction must be stopped
processWriteResult(false, t);
return null;
});
}
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors
private void deleteThingFromSearchIndex() {
final StartedTimer timer =
DittoMetrics.expiringTimer(TRACE_THING_DELETE).tag(UPDATE_TYPE_TAG, "delete").build();
circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
.delete(thingId)
.via(stopTimer(timer))
.runWith(Sink.last(), materializer)
.whenComplete(this::handleDeletion));
}
代码示例来源:origin: eclipse/ditto
private void deleteThingFromSearchIndex() {
final StartedTimer timer =
DittoMetrics.expiringTimer(TRACE_THING_DELETE).tag(UPDATE_TYPE_TAG, "delete").build();
circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
.delete(thingId)
.via(stopTimer(timer))
.runWith(Sink.last(), materializer)
.whenComplete(this::handleDeletion));
}
代码示例来源:origin: eclipse/ditto
private Source<Message, NotUsed> createSource(final String connectionCorrelationId, final ProtocolAdapter adapter) {
return Source.<Jsonifiable.WithPredicate<JsonObject, JsonField>>actorPublisher(
EventAndResponsePublisher.props(publisherBackpressureBufferSize))
.mapMaterializedValue(actorRef -> {
streamingActor.tell(new Connect(actorRef, connectionCorrelationId, STREAMING_TYPE_WS), null);
return NotUsed.getInstance();
})
.map(this::publishResponsePublishedEvent)
.map(jsonifiableToString(adapter))
.via(Flow.fromFunction(result -> {
LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger ->
logger.debug("Sending outgoing WebSocket message: {}", result));
return result;
}))
.map(TextMessage::create);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors
private CompletionStage<Boolean> updatePolicy(final Thing thing, final Enforcer policyEnforcer) {
if (policyEnforcer == null) {
log.warning("Enforcer was null when trying to update Policy search index - resyncing Policy!");
syncPolicy(thing);
return CompletableFuture.completedFuture(Boolean.FALSE);
}
final StartedTimer timer = DittoMetrics.expiringTimer(TRACE_POLICY_UPDATE).tag(UPDATE_TYPE_TAG, "update").build();
return circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
.updatePolicy(thing, policyEnforcer)
.via(stopTimer(timer))
.runWith(Sink.last(), materializer)
.whenComplete((isPolicyUpdated, throwable) -> {
if (null != throwable) {
log.error(throwable, "Failed to update policy because of an exception!");
} else if (!isPolicyUpdated) {
log.debug("The update operation for the policy of Thing <{}> did not have an effect, " +
"probably because it does not contain fine-grained policies!", thingId);
} else {
log.debug("Successfully updated policy.");
}
}));
}
代码示例来源:origin: eclipse/ditto
private CompletionStage<Boolean> updatePolicy(final Thing thing, final Enforcer policyEnforcer) {
if (policyEnforcer == null) {
log.warning("Enforcer was null when trying to update Policy search index - resyncing Policy!");
syncPolicy(thing);
return CompletableFuture.completedFuture(Boolean.FALSE);
}
final StartedTimer timer =
DittoMetrics.expiringTimer(TRACE_POLICY_UPDATE).tag(UPDATE_TYPE_TAG, "update").build();
return circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
.updatePolicy(thing, policyEnforcer)
.via(stopTimer(timer))
.runWith(Sink.last(), materializer)
.whenComplete((isPolicyUpdated, throwable) -> {
if (null != throwable) {
log.error(throwable, "Failed to update policy because of an exception!");
} else if (!isPolicyUpdated) {
log.debug("The update operation for the policy of Thing <{}> did not have an effect, " +
"probably because it does not contain fine-grained policies!", thingId);
} else {
log.debug("Successfully updated policy.");
}
}));
}
代码示例来源:origin: eclipse/ditto
() -> searchPersistence.findAll((PolicyRestrictedSearchAggregation) query),
dittoHeaders)
.via(Flow.fromFunction(result -> {
databaseAccessTimer.stop();
return result;
.via(Flow.fromFunction(result -> {
databaseAccessTimer.stop();
return result;
.via(Flow.fromFunction(result -> {
searchTimer.stop();
return result;
代码示例来源:origin: eclipse/ditto
() -> searchPersistence.count((PolicyRestrictedSearchAggregation) query),
dittoHeaders)
.via(Flow.fromFunction(result -> {
databaseAccessTimer.stop();
return result;
.via(Flow.fromFunction(result -> {
databaseAccessTimer.stop();
return result;
.via(Flow.fromFunction(result -> {
countTimer.stop();
return result;
内容来源于网络,如有侵权,请联系作者删除!