akka.stream.javadsl.Source类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(12.7k)|赞(0)|评价(0)|浏览(195)

本文整理了Java中akka.stream.javadsl.Source类的一些代码示例,展示了Source类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source类的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source

Source介绍

暂无

代码示例

代码示例来源:origin: com.typesafe.play/play_2.12

public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
  Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
    if (part instanceof Http.MultipartFormData.DataPart) {
      Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
      return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
    } else if (part instanceof Http.MultipartFormData.FilePart) {
      Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
      if (fp.ref instanceof Source) {
        Source ref = (Source) fp.ref;
        Option<String> ct = Option.apply(fp.getContentType());
        return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
      }
    }
    throw new UnsupportedOperationException("Unsupported Part Class");
  });
  return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}

代码示例来源:origin: com.typesafe.play/play_2.12

/**
 * Set a Multipart Form url encoded body to this request saving it as a raw body.
 *
 * @param data the multipart-form parameters
 * @param temporaryFileCreator the temporary file creator.
 * @param mat a Akka Streams Materializer
 * @return the modified builder
 */
public RequestBuilder bodyRaw(List<MultipartFormData.Part<Source<ByteString, ?>>> data, Files.TemporaryFileCreator temporaryFileCreator, Materializer mat) {
  String boundary = MultipartFormatter.randomBoundary();
  try {
    ByteString materializedData = MultipartFormatter
        .transform(Source.from(data), boundary)
        .runWith(Sink.reduce(ByteString::concat), mat)
        .toCompletableFuture()
        .get();
    play.api.mvc.RawBuffer buffer = new play.api.mvc.RawBuffer(materializedData.size(), temporaryFileCreator.asScala(), materializedData);
    return body(new RequestBody(JavaParsers.toJavaRaw(buffer)), MultipartFormatter.boundaryToContentType(boundary));
  } catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException("Failure while materializing Multipart/Form Data", e);
  }
}

代码示例来源:origin: com.typesafe.play/play_2.12

@Override
public Source<ByteString, ?> dataStream() {
  return Source.<ByteString>single(data);
}

代码示例来源:origin: eclipse/ditto

Source.fromCompletionStage(PatternsCS.ask(chosenQueryActor, countThings, QUERY_ASK_TIMEOUT))
    .flatMapConcat(query -> {
      LogUtil.enhanceLogWithCorrelationId(log, correlationIdOpt);
      queryParsingTimer.stop();
            () -> searchPersistence.count((PolicyRestrictedSearchAggregation) query),
            dittoHeaders)
            .via(Flow.fromFunction(result -> {
              databaseAccessTimer.stop();
              return result;
            }))
            .map(count -> CountThingsResponse.of(count, dittoHeaders));
      } else if (query instanceof Query) {
        final StartedTimer databaseAccessTimer =
            .via(Flow.fromFunction(result -> {
              databaseAccessTimer.stop();
              return result;
            }))
            .map(count -> CountThingsResponse.of(count, dittoHeaders));
      } else if (query instanceof DittoRuntimeException) {
        log.info("QueryActor responded with DittoRuntimeException: {}", query);
        return Source.<Object>failed((Throwable) query);
      } else {
        log.error("Expected 'PolicyRestrictedSearchAggregation', but got: {}", query);
        return Source.<Object>single(CountThingsResponse.of(-1, dittoHeaders));
    .via(Flow.fromFunction(result -> {
      countTimer.stop();

代码示例来源:origin: eclipse/ditto

private Route handleSudoCountThingsPerRequest(final RequestContext ctx, final SudoCountThings command) {
  final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
  Source.single(command)
      .to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
          HttpRequestActor.COMPLETE_MESSAGE))
      .run(materializer);
  final CompletionStage<HttpResponse> allThingsCountHttpResponse = Source.fromCompletionStage(httpResponseFuture)
      .flatMapConcat(httpResponse -> httpResponse.entity().getDataBytes())
      .fold(ByteString.empty(), ByteString::concat)
      .map(ByteString::utf8String)
      .map(Integer::valueOf)
      .map(count -> JsonObject.newBuilder().set("allThingsCount", count).build())
      .map(jsonObject -> HttpResponse.create()
          .withEntity(ContentTypes.APPLICATION_JSON, ByteString.fromString(jsonObject.toString()))
          .withStatus(HttpStatusCode.OK.toInt()))
      .runWith(Sink.head(), materializer);
  return completeWithFuture(allThingsCountHttpResponse);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors

private CompletionStage<Boolean> updateThing(final Thing thing) {
  final long currentSequenceNumber = thing.getRevision()
      .map(ThingRevision::toLong)
      .orElseThrow(() -> {
        final String message = MessageFormat.format("The Thing <{0}> has no revision!", thingId);
        return new IllegalArgumentException(message);
      });
  final StartedTimer timer =
      DittoMetrics.expiringTimer(TRACE_THING_MODIFIED).tag(UPDATE_TYPE_TAG, "modified").build();
  return circuitBreaker.callWithCircuitBreakerCS(
      () -> searchUpdaterPersistence
          .insertOrUpdate(thing, currentSequenceNumber, policyRevision)
          .via(stopTimer(timer))
          .runWith(Sink.last(), materializer));
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private CompletionStage<Optional<Throwable>> generateStatusResponse() {
  final String id = UUID.randomUUID().toString();
  return Source.fromPublisher(collection.insertOne(new Document(ID_FIELD, id)))
      .flatMapConcat(s ->
          Source.fromPublisher(collection.find(eq(ID_FIELD, id))).flatMapConcat(r ->
              Source.fromPublisher(collection.deleteOne(eq(ID_FIELD, id)))
                  .map(DeleteResult::getDeletedCount)
          )
      )
      .runWith(Sink.seq(), materializer)
      .handle((result, error) -> {
        if (error != null) {
          return Optional.of(error);
        } else if (!Objects.equals(result, Collections.singletonList(1L))) {
          final String message = "Expect 1 document inserted and deleted. Found: " + result;
          return Optional.of(new IllegalStateException(message));
        } else {
          return Optional.empty();
        }
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

private Source<Boolean, NotUsed> delete(final String thingId, final Bson filter, final Bson document) {
  return Source.fromPublisher(collection.updateOne(filter, document))
      .flatMapConcat(deleteResult -> {
        if (deleteResult.getMatchedCount() <= 0) {
          return Source.single(Boolean.FALSE);
        }
        final PolicyUpdate deletePolicyEntries = PolicyUpdateFactory.createDeleteThingUpdate(thingId);
        final Bson policyIndexRemoveFilter = deletePolicyEntries.getPolicyIndexRemoveFilter();
        return Source.fromPublisher(policiesCollection.deleteMany(policyIndexRemoveFilter))
            .map(r -> true);
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<Long, NotUsed> count(final PolicyRestrictedSearchAggregation policyRestrictedSearchAggregation) {
  checkNotNull(policyRestrictedSearchAggregation, "policy restricted aggregation");
  final Source<Document, NotUsed> source = policyRestrictedSearchAggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.get(PersistenceConstants.COUNT_RESULT_NAME))
      .map(countResult -> (Number) countResult)
      .map(Number::longValue) // use Number.longValue() to support both Integer and Long values
      .orElse(Source.<Long>single(0L))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("count");
}

代码示例来源:origin: joshlong/reactive-spring-online-training

Publisher<HashTag> hashtags() {
    return Source
      .fromPublisher(this.tweets())
      .map(Tweet::getHashTags)
      .reduce((a, b) -> {
          Set<HashTag> tags = new HashSet<>();
          tags.addAll(a);
          tags.addAll(b);
          return tags;
      })
      .mapConcat(param -> param)
      .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), this.actorMaterializer);
}

代码示例来源:origin: eclipse/ditto

@Override
public Source<NotUsed, NotUsed> updateLastSuccessfulStreamEnd(final Instant timestamp) {
  final Date mongoStorableDate = Date.from(timestamp);
  final Document toStore = new Document()
      .append(FIELD_TIMESTAMP, mongoStorableDate);
  return Source.fromPublisher(lastSuccessfulSearchSyncCollection.insertOne(toStore))
      .map(success -> {
        LOGGER.debug("Successfully inserted timestamp for search synchronization: <{}>.", timestamp);
        return NotUsed.getInstance();
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-akka

@Override
  public Receive createReceive() {
    return ReceiveBuilder.create()
        .matchAny(message -> {
          if (message instanceof WithDittoHeaders) {
            LogUtil.enhanceLogWithCorrelationId(log, (WithDittoHeaders<?>) message);
          }
          log.debug("Received message: <{}>.", message);
          final WithSender wrapped = WithSender.of(message, getSender());
          Source.single(wrapped).runWith(messageHandler, materializer);
        })
        .build();
  }
}

代码示例来源:origin: com.lightbend.lagom/lagom-javadsl-testkit

public <Event extends AggregateEvent<Event>> CompletionStage<Done> feed(Event e, Offset offset) {
 AggregateEventTagger<Event> tag = e.aggregateTag();
 List<Pair<ReadSideHandler<?>, Offset>> list = processors.get(tag.eventType());
 if (list == null) {
  throw new RuntimeException("No processor registered for Event " + tag.eventType().getCanonicalName());
 }
 List<CompletionStage<?>> stages = list.stream().map(pHandlerOffset -> {
  @SuppressWarnings("unchecked") ReadSideHandler<Event> handler = (ReadSideHandler<Event>) pHandlerOffset.first();
  Flow<Pair<Event, Offset>, Done, ?> flow = handler.handle();
    return Source.single(Pair.create(e, offset)).via(flow).runWith(Sink.ignore(), materializer);
   }
 ).collect(Collectors.toList());
 return doAll(stages);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private CompletionStage<Done> dropUndefinedIndices(final String collectionName, final List<Index> definedIndices) {
  return getIndicesExceptDefaultIndex(collectionName)
      .flatMapConcat(existingIndices -> {
        LOGGER.info("Drop undefined indices - Existing indices are: {}", existingIndices);
        final List<String> indicesToDrop = getUndefinedIndexNames(existingIndices, definedIndices);
        LOGGER.info("Dropping undefined indices: {}", indicesToDrop);
        return dropIndices(collectionName, indicesToDrop);
      })
      .runWith(Sink.ignore(), materializer);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private Source<Optional<Instant>, NotUsed> retrieveLastSuccessfulStreamEndAsync() {
  return Source.fromPublisher(lastSuccessfulSearchSyncCollection.find())
      .limit(1)
      .flatMapConcat(doc -> {
        final Date date = doc.getDate(FIELD_TIMESTAMP);
        final Instant timestamp = date.toInstant();
        LOGGER.debug("Returning last timestamp of search synchronization: <{}>.", timestamp);
        return Source.single(Optional.of(timestamp));
      })
      .orElse(Source.single(Optional.empty()));
}

代码示例来源:origin: apptik/RHub

@Override
@SuppressWarnings("unchecked")
protected Publisher hide(Processor processor) {
  return (Publisher) Source
      .fromPublisher(processor)
      .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
}

代码示例来源:origin: eclipse/ditto

private static Graph<SourceShape<WithSender>, NotUsed> keepResultAndLogErrors(final Object result) {
  if (result instanceof WithSender) {
    return Source.single((WithSender) result);
  } else if (result instanceof DittoRuntimeException) {
    return Source.single(result)
        .log("PreEnforcer replied DittoRuntimeException")
        .withAttributes(INFO_LEVEL)
        .flatMapConcat(x -> Source.empty());
  } else {
    return Source.single(result)
        .log("PreEnforcer encountered unexpected exception")
        .withAttributes(ERROR_LEVEL)
        .flatMapConcat(x -> Source.empty());
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static void main(String[] args) throws Exception {
    Config cfg = ConfigFactory.parseResources(AkkaRange.class, "/akka-streams.conf").resolve();
    ActorSystem actorSystem = ActorSystem.create("sys", cfg);

    ActorMaterializer materializer = ActorMaterializer.create(actorSystem);

    Source<Integer, NotUsed> source = Source.repeat(1)
        .map(v -> v + 1);
    
    Publisher<Integer> p = source.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

    p.subscribe(println());

    Thread.sleep(1000);

    actorSystem.terminate();
  }
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public final Source<Set<String>, NotUsed> getThingIdsForPolicy(final String policyId) {
  log.debug("Retrieving Thing ids for policy: <{}>", policyId);
  final Bson filter = eq(FIELD_POLICY_ID, policyId);
  return Source.fromPublisher(collection.find(filter)
      .projection(new BsonDocument(FIELD_ID, new BsonInt32(1))))
      .map(doc -> doc.getString(FIELD_ID))
      .fold(new HashSet<>(), (set, id) -> {
        set.add(id);
        return set;
      });
}

代码示例来源:origin: com.typesafe.play/play_2.11

/**
 * Convert the given source of ByteStrings to a chunked entity.
 *
 * @param data The source.
 * @param contentType The optional content type.
 * @return The ByteStrings.
 */
public static final HttpEntity chunked(Source<ByteString, ?> data, Optional<String> contentType) {
  return new Chunked(data.map(HttpChunk.Chunk::new), contentType);
}

相关文章