akka.stream.javadsl.Source.via()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(13.5k)|赞(0)|评价(0)|浏览(171)

本文整理了Java中akka.stream.javadsl.Source.via()方法的一些代码示例,展示了Source.via()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.via()方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:via

Source.via介绍

暂无

代码示例

代码示例来源:origin: com.typesafe.play/play_2.11

public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
  Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
    if (part instanceof Http.MultipartFormData.DataPart) {
      Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
      return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
    } else if (part instanceof Http.MultipartFormData.FilePart) {
      Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
      if (fp.ref instanceof Source) {
        Source ref = (Source) fp.ref;
        Option<String> ct = Option.apply(fp.getContentType());
        return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
      }
    }
    throw new UnsupportedOperationException("Unsupported Part Class");
  });
  return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}

代码示例来源:origin: com.typesafe.play/play

public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
  Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
    if (part instanceof Http.MultipartFormData.DataPart) {
      Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
      return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
    } else if (part instanceof Http.MultipartFormData.FilePart) {
      Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
      if (fp.ref instanceof Source) {
        Source ref = (Source) fp.ref;
        Option<String> ct = Option.apply(fp.getContentType());
        return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
      }
    }
    throw new UnsupportedOperationException("Unsupported Part Class");
  });
  return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}

代码示例来源:origin: com.typesafe.play/play_2.12

public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
  Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
    if (part instanceof Http.MultipartFormData.DataPart) {
      Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
      return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
    } else if (part instanceof Http.MultipartFormData.FilePart) {
      Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
      if (fp.ref instanceof Source) {
        Source ref = (Source) fp.ref;
        Option<String> ct = Option.apply(fp.getContentType());
        return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
      }
    }
    throw new UnsupportedOperationException("Unsupported Part Class");
  });
  return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}

代码示例来源:origin: eclipse/ditto

private <T> Source<T, NotUsed> processSearchPersistenceResult(final Supplier<Source<T, NotUsed>> resultSupplier,
    final DittoHeaders dittoHeaders) {
  final Source<T, NotUsed> source = resultSupplier.get();
  final Flow<T, T, NotUsed> logAndFinishPersistenceSegmentFlow =
      Flow.fromFunction(result -> {
        // we know that the source provides exactly one ResultList
        LogUtil.enhanceLogWithCorrelationId(log, dittoHeaders.getCorrelationId());
        log.debug("Persistence returned: {}", result);
        return result;
      });
  return source.<T, NotUsed>via(logAndFinishPersistenceSegmentFlow);
}

代码示例来源:origin: com.lightbend.lagom/lagom-javadsl-testkit

public <Event extends AggregateEvent<Event>> CompletionStage<Done> feed(Event e, Offset offset) {
 AggregateEventTagger<Event> tag = e.aggregateTag();
 List<Pair<ReadSideHandler<?>, Offset>> list = processors.get(tag.eventType());
 if (list == null) {
  throw new RuntimeException("No processor registered for Event " + tag.eventType().getCanonicalName());
 }
 List<CompletionStage<?>> stages = list.stream().map(pHandlerOffset -> {
  @SuppressWarnings("unchecked") ReadSideHandler<Event> handler = (ReadSideHandler<Event>) pHandlerOffset.first();
  Flow<Pair<Event, Offset>, Done, ?> flow = handler.handle();
    return Source.single(Pair.create(e, offset)).via(flow).runWith(Sink.ignore(), materializer);
   }
 ).collect(Collectors.toList());
 return doAll(stages);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors

private CompletionStage<Boolean> updateThing(final Thing thing) {
  final long currentSequenceNumber = thing.getRevision()
      .map(ThingRevision::toLong)
      .orElseThrow(() -> {
        final String message = MessageFormat.format("The Thing <{0}> has no revision!", thingId);
        return new IllegalArgumentException(message);
      });
  final StartedTimer timer =
      DittoMetrics.expiringTimer(TRACE_THING_MODIFIED).tag(UPDATE_TYPE_TAG, "modified").build();
  return circuitBreaker.callWithCircuitBreakerCS(
      () -> searchUpdaterPersistence
          .insertOrUpdate(thing, currentSequenceNumber, policyRevision)
          .via(stopTimer(timer))
          .runWith(Sink.last(), materializer));
}

代码示例来源:origin: eclipse/ditto

private CompletionStage<Boolean> updateThing(final Thing thing) {
  final long currentSequenceNumber = thing.getRevision()
      .map(ThingRevision::toLong)
      .orElseThrow(() -> {
        final String message = MessageFormat.format("The Thing <{0}> has no revision!", thingId);
        return new IllegalArgumentException(message);
      });
  final StartedTimer timer =
      DittoMetrics.expiringTimer(TRACE_THING_MODIFIED).tag(UPDATE_TYPE_TAG, "modified").build();
  return circuitBreaker.callWithCircuitBreakerCS(
      () -> searchUpdaterPersistence
          .insertOrUpdate(thing, currentSequenceNumber, policyRevision)
          .via(stopTimer(timer))
          .runWith(Sink.last(), materializer));
}

代码示例来源:origin: com.lightbend.akka/akka-stream-alpakka-file

/**
 * Java API: Read the entire contents of a file as text lines, and then when the end is reached,
 * keep reading newly appended data. Like the unix command `tail -f`.
 *
 * <p>If a line is longer than `maxChunkSize` the stream will fail.
 *
 * <p>Aborting the stage can be done by combining with a [[akka.stream.KillSwitch]]
 *
 * @param path a file path to tail
 * @param maxLineSize The max emitted size of the `ByteString`s
 * @param pollingInterval When the end has been reached, look for new content with this interval
 * @param lf The character or characters used as line separator
 * @param charset The charset of the file
 */
public static Source<String, NotUsed> createLines(
  Path path, int maxLineSize, FiniteDuration pollingInterval, String lf, Charset charset) {
 return create(path, maxLineSize, 0, pollingInterval)
   .via(Framing.delimiter(ByteString.fromString(lf, charset.name()), maxLineSize))
   .map(bytes -> bytes.decodeString(charset));
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors

private void persistThingEvents(final List<ThingEvent> thingEvents, final long targetRevision) {
  log.debug("Executing bulk write operation with <{}> updates.", thingEvents.size());
  if (!thingEvents.isEmpty()) {
    transactionActive = true;
    Kamon.histogram(COUNT_THING_BULK_UPDATES_PER_BULK).record(thingEvents.size());
    final StartedTimer bulkUpdate =
        DittoMetrics.expiringTimer(TRACE_THING_BULK_UPDATE).tag(UPDATE_TYPE_TAG, "bulkUpdate").build();
    circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
        .executeCombinedWrites(thingId, thingEvents, policyEnforcer, targetRevision)
        .via(stopTimer(bulkUpdate))
        .runWith(Sink.last(), materializer)
        .whenComplete(this::processWriteResult))
        .exceptionally(t -> {
          log.error(t, "There occurred an error while processing a write operation within the"
              + " circuit breaker for thing <{}>.", thingId);
          //e.g. in case of a circuit breaker timeout, the running transaction must be stopped
          processWriteResult(false, t);
          return null;
        });
  }
}

代码示例来源:origin: eclipse/ditto

private void persistThingEvents(final List<ThingEvent> thingEvents, final long targetRevision) {
  log.debug("Executing bulk write operation with <{}> updates.", thingEvents.size());
  if (!thingEvents.isEmpty()) {
    transactionActive = true;
    Kamon.histogram(COUNT_THING_BULK_UPDATES_PER_BULK).record(thingEvents.size());
    final StartedTimer bulkUpdate =
        DittoMetrics.expiringTimer(TRACE_THING_BULK_UPDATE).tag(UPDATE_TYPE_TAG, "bulkUpdate").build();
    circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
        .executeCombinedWrites(thingId, thingEvents, policyEnforcer, targetRevision)
        .via(stopTimer(bulkUpdate))
        .runWith(Sink.last(), materializer)
        .whenComplete(this::processWriteResult))
        .exceptionally(t -> {
          log.error(t, "There occurred an error while processing a write operation within the"
              + " circuit breaker for thing <{}>.", thingId);
          //e.g. in case of a circuit breaker timeout, the running transaction must be stopped
          processWriteResult(false, t);
          return null;
        });
  }
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors

private void deleteThingFromSearchIndex() {
  final StartedTimer timer =
      DittoMetrics.expiringTimer(TRACE_THING_DELETE).tag(UPDATE_TYPE_TAG, "delete").build();
  circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
      .delete(thingId)
      .via(stopTimer(timer))
      .runWith(Sink.last(), materializer)
      .whenComplete(this::handleDeletion));
}

代码示例来源:origin: eclipse/ditto

private void deleteThingFromSearchIndex() {
  final StartedTimer timer =
      DittoMetrics.expiringTimer(TRACE_THING_DELETE).tag(UPDATE_TYPE_TAG, "delete").build();
  circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
      .delete(thingId)
      .via(stopTimer(timer))
      .runWith(Sink.last(), materializer)
      .whenComplete(this::handleDeletion));
}

代码示例来源:origin: eclipse/ditto

private Source<Message, NotUsed> createSource(final String connectionCorrelationId, final ProtocolAdapter adapter) {
  return Source.<Jsonifiable.WithPredicate<JsonObject, JsonField>>actorPublisher(
      EventAndResponsePublisher.props(publisherBackpressureBufferSize))
      .mapMaterializedValue(actorRef -> {
        streamingActor.tell(new Connect(actorRef, connectionCorrelationId, STREAMING_TYPE_WS), null);
        return NotUsed.getInstance();
      })
      .map(this::publishResponsePublishedEvent)
      .map(jsonifiableToString(adapter))
      .via(Flow.fromFunction(result -> {
        LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger ->
            logger.debug("Sending outgoing WebSocket message: {}", result));
        return result;
      }))
      .map(TextMessage::create);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors

private CompletionStage<Boolean> updatePolicy(final Thing thing, final Enforcer policyEnforcer) {
  if (policyEnforcer == null) {
    log.warning("Enforcer was null when trying to update Policy search index - resyncing Policy!");
    syncPolicy(thing);
    return CompletableFuture.completedFuture(Boolean.FALSE);
  }
  final StartedTimer timer = DittoMetrics.expiringTimer(TRACE_POLICY_UPDATE).tag(UPDATE_TYPE_TAG, "update").build();
  return circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
      .updatePolicy(thing, policyEnforcer)
      .via(stopTimer(timer))
      .runWith(Sink.last(), materializer)
      .whenComplete((isPolicyUpdated, throwable) -> {
        if (null != throwable) {
          log.error(throwable, "Failed to update policy because of an exception!");
        } else if (!isPolicyUpdated) {
          log.debug("The update operation for the policy of Thing <{}> did not have an effect, " +
              "probably because it does not contain fine-grained policies!", thingId);
        } else {
          log.debug("Successfully updated policy.");
        }
      }));
}

代码示例来源:origin: eclipse/ditto

private CompletionStage<Boolean> updatePolicy(final Thing thing, final Enforcer policyEnforcer) {
  if (policyEnforcer == null) {
    log.warning("Enforcer was null when trying to update Policy search index - resyncing Policy!");
    syncPolicy(thing);
    return CompletableFuture.completedFuture(Boolean.FALSE);
  }
  final StartedTimer timer =
      DittoMetrics.expiringTimer(TRACE_POLICY_UPDATE).tag(UPDATE_TYPE_TAG, "update").build();
  return circuitBreaker.callWithCircuitBreakerCS(() -> searchUpdaterPersistence
      .updatePolicy(thing, policyEnforcer)
      .via(stopTimer(timer))
      .runWith(Sink.last(), materializer)
      .whenComplete((isPolicyUpdated, throwable) -> {
        if (null != throwable) {
          log.error(throwable, "Failed to update policy because of an exception!");
        } else if (!isPolicyUpdated) {
          log.debug("The update operation for the policy of Thing <{}> did not have an effect, " +
              "probably because it does not contain fine-grained policies!", thingId);
        } else {
          log.debug("Successfully updated policy.");
        }
      }));
}

代码示例来源:origin: eclipse/ditto

() -> searchPersistence.findAll((PolicyRestrictedSearchAggregation) query),
        dittoHeaders)
        .via(Flow.fromFunction(result -> {
          databaseAccessTimer.stop();
          return result;
        .via(Flow.fromFunction(result -> {
          databaseAccessTimer.stop();
          return result;
.via(Flow.fromFunction(result -> {
  searchTimer.stop();
  return result;

代码示例来源:origin: eclipse/ditto

() -> searchPersistence.count((PolicyRestrictedSearchAggregation) query),
        dittoHeaders)
        .via(Flow.fromFunction(result -> {
          databaseAccessTimer.stop();
          return result;
        .via(Flow.fromFunction(result -> {
          databaseAccessTimer.stop();
          return result;
.via(Flow.fromFunction(result -> {
  countTimer.stop();
  return result;

相关文章