akka.stream.javadsl.Source.single()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.7k)|赞(0)|评价(0)|浏览(111)

本文整理了Java中akka.stream.javadsl.Source.single()方法的一些代码示例,展示了Source.single()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.single()方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:single

Source.single介绍

暂无

代码示例

代码示例来源:origin: com.typesafe.play/play_2.11

@Override
public Source<ByteString, ?> dataStream() {
  return Source.<ByteString>single(data);
}

代码示例来源:origin: com.typesafe.play/play_2.12

@Override
public Source<ByteString, ?> dataStream() {
  return Source.<ByteString>single(data);
}

代码示例来源:origin: com.typesafe.play/play

@Override
public Source<ByteString, ?> dataStream() {
  return Source.<ByteString>single(data);
}

代码示例来源:origin: com.typesafe.play/play-streams_2.11

public CompletionStage<A> run(E element, Materializer mat) {
  return run(Source.single(element), mat);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

private Source<ThingMetadata, NotUsed> defaultThingMetadata() {
  return Source.single(new ThingMetadata(-1L, null, -1L));
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private Source<Optional<Instant>, NotUsed> retrieveLastSuccessfulStreamEndAsync() {
  return Source.fromPublisher(lastSuccessfulSearchSyncCollection.find())
      .limit(1)
      .flatMapConcat(doc -> {
        final Date date = doc.getDate(FIELD_TIMESTAMP);
        final Instant timestamp = date.toInstant();
        LOGGER.debug("Returning last timestamp of search synchronization: <{}>.", timestamp);
        return Source.single(Optional.of(timestamp));
      })
      .orElse(Source.single(Optional.empty()));
}

代码示例来源:origin: eclipse/ditto

private Source<Optional<Instant>, NotUsed> retrieveLastSuccessfulStreamEndAsync() {
  return Source.fromPublisher(lastSuccessfulSearchSyncCollection.find())
      .limit(1)
      .flatMapConcat(doc -> {
        final Date date = doc.getDate(FIELD_TIMESTAMP);
        final Instant timestamp = date.toInstant();
        LOGGER.debug("Returning last timestamp of search synchronization: <{}>.", timestamp);
        return Source.single(Optional.of(timestamp));
      })
      .orElse(Source.single(Optional.empty()));
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

private static PartialFunction<Throwable, Source<Success, NotUsed>> buildDropIndexRecovery(
    final String indexDescription) {
  return new PFBuilder<Throwable, Source<Success, NotUsed>>()
      .match(MongoCommandException.class, IndexOperations::isIndexNotFound, throwable -> {
        LOGGER.debug("Index <{}> could not be dropped because it does not exist (anymore).",
            indexDescription);
        return Source.single(Success.SUCCESS);
      })
      .build();
}

代码示例来源:origin: eclipse/ditto

private static PartialFunction<Throwable, Source<Success, NotUsed>> buildDropIndexRecovery(
    final String indexDescription) {
  return new PFBuilder<Throwable, Source<Success, NotUsed>>()
      .match(MongoCommandException.class, IndexOperations::isIndexNotFound, throwable -> {
        LOGGER.debug("Index <{}> could not be dropped because it does not exist (anymore).",
            indexDescription);
        return Source.single(Success.SUCCESS);
      })
      .build();
}

代码示例来源:origin: com.lightbend.lagom/lagom-javadsl-testkit

public <Event extends AggregateEvent<Event>> CompletionStage<Done> feed(Event e, Offset offset) {
 AggregateEventTagger<Event> tag = e.aggregateTag();
 List<Pair<ReadSideHandler<?>, Offset>> list = processors.get(tag.eventType());
 if (list == null) {
  throw new RuntimeException("No processor registered for Event " + tag.eventType().getCanonicalName());
 }
 List<CompletionStage<?>> stages = list.stream().map(pHandlerOffset -> {
  @SuppressWarnings("unchecked") ReadSideHandler<Event> handler = (ReadSideHandler<Event>) pHandlerOffset.first();
  Flow<Pair<Event, Offset>, Done, ?> flow = handler.handle();
    return Source.single(Pair.create(e, offset)).via(flow).runWith(Sink.ignore(), materializer);
   }
 ).collect(Collectors.toList());
 return doAll(stages);
}

代码示例来源:origin: com.typesafe.play/play-java

/**
 * Produces a flow of ByteString with a prepended block and a script wrapper.
 *
 * @param callbackName the javascript callback method.
 * @return a flow of ByteString elements.
 */
public static Flow<ByteString, ByteString, NotUsed> flow(String callbackName) {
  ByteString cb = ByteString.fromString(callbackName);
  return Flow.of(ByteString.class).map((msg) -> {
    return formatted(cb, msg);
  }).prepend(Source.single(initialChunk));
}

代码示例来源:origin: com.typesafe.play/play-java_2.12

/**
 * Produces a flow of ByteString with a prepended block and a script wrapper.
 *
 * @param callbackName the javascript callback method.
 * @return a flow of ByteString elements.
 */
public static Flow<ByteString, ByteString, NotUsed> flow(String callbackName) {
  ByteString cb = ByteString.fromString(callbackName);
  return Flow.of(ByteString.class).map((msg) -> {
    return formatted(cb, msg);
  }).prepend(Source.single(initialChunk));
}

代码示例来源:origin: com.typesafe.play/play-java_2.11

/**
 * Produces a flow of ByteString with a prepended block and a script wrapper.
 *
 * @param callbackName the javascript callback method.
 * @return a flow of ByteString elements.
 */
public static Flow<ByteString, ByteString, NotUsed> flow(String callbackName) {
  ByteString cb = ByteString.fromString(callbackName);
  return Flow.of(ByteString.class).map((msg) -> {
    return formatted(cb, msg);
  }).prepend(Source.single(initialChunk));
}

代码示例来源:origin: apptik/RHub

@Override
public void emit(Object event) {
  Source.single(event).viaMat(busFlow, Keep.right())
      .to(Sink.ignore())
      .run(mat);
}

代码示例来源:origin: wxyyxc1992/Backend-Boilerplates

public static TextMessage handleTextMessage(TextMessage msg) {
  if (msg.isStrict()) // optimization that directly creates a simple response...
  {
    return TextMessage.create("Hello " + msg.getStrictText());
  } else // ... this would suffice to handle all text messages in a streaming fashion
  {
    return TextMessage.create(Source.single("Hello ").concat(msg.getStreamedText()));
  }
}
//#websocket-handler

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-akka

@Override
  public Receive createReceive() {
    return ReceiveBuilder.create()
        .matchAny(message -> {
          if (message instanceof WithDittoHeaders) {
            LogUtil.enhanceLogWithCorrelationId(log, (WithDittoHeaders<?>) message);
          }
          log.debug("Received message: <{}>.", message);
          final WithSender wrapped = WithSender.of(message, getSender());
          Source.single(wrapped).runWith(messageHandler, materializer);
        })
        .build();
  }
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

private Source<Boolean, NotUsed> delete(final String thingId, final Bson filter, final Bson document) {
  return Source.fromPublisher(collection.updateOne(filter, document))
      .flatMapConcat(deleteResult -> {
        if (deleteResult.getMatchedCount() <= 0) {
          return Source.single(Boolean.FALSE);
        }
        final PolicyUpdate deletePolicyEntries = PolicyUpdateFactory.createDeleteThingUpdate(thingId);
        final Bson policyIndexRemoveFilter = deletePolicyEntries.getPolicyIndexRemoveFilter();
        return Source.fromPublisher(policiesCollection.deleteMany(policyIndexRemoveFilter))
            .map(r -> true);
      });
}

代码示例来源:origin: eclipse/ditto

private Source<Boolean, NotUsed> delete(final String thingId, final Bson filter, final Bson document) {
  return Source.fromPublisher(collection.updateOne(filter, document))
      .flatMapConcat(deleteResult -> {
        if (deleteResult.getMatchedCount() <= 0) {
          return Source.single(Boolean.FALSE);
        }
        final PolicyUpdate deletePolicyEntries = PolicyUpdateFactory.createDeleteThingUpdate(thingId);
        final Bson policyIndexRemoveFilter = deletePolicyEntries.getPolicyIndexRemoveFilter();
        return Source.fromPublisher(policiesCollection.deleteMany(policyIndexRemoveFilter))
            .map(r -> true);
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<Long, NotUsed> count(final PolicyRestrictedSearchAggregation policyRestrictedSearchAggregation) {
  checkNotNull(policyRestrictedSearchAggregation, "policy restricted aggregation");
  final Source<Document, NotUsed> source = policyRestrictedSearchAggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.get(PersistenceConstants.COUNT_RESULT_NAME))
      .map(countResult -> (Number) countResult)
      .map(Number::longValue) // use Number.longValue() to support both Integer and Long values
      .orElse(Source.<Long>single(0L))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("count");
}

代码示例来源:origin: eclipse/ditto

@Override
public Source<Long, NotUsed> count(final PolicyRestrictedSearchAggregation policyRestrictedSearchAggregation) {
  checkNotNull(policyRestrictedSearchAggregation, "policy restricted aggregation");
  final Source<Document, NotUsed> source = policyRestrictedSearchAggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.get(PersistenceConstants.COUNT_RESULT_NAME))
      .map(countResult -> (Number) countResult)
      .map(Number::longValue) // use Number.longValue() to support both Integer and Long values
      .orElse(Source.<Long>single(0L))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("count");
}

相关文章