从2016年的阅读谷歌群帖中可以看出:“.map()已转换为.via()”
源代码:https://groups.google.com/g/akka-user/c/EzHygZpcCHg
下列代码行是否等效:
Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);
是否存在应使用图而非流的情况?
源代码:
请求DTO:
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;
@Getter
@Setter
@Builder
@ToString
@Jacksonized
public class RequestDto {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
private final Date datePurchased;
}
StreamManager(包含主方法):
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.concurrent.CompletionStage;
public class StreamManager {
final static ObjectMapper mapper = new ObjectMapper();
private static final Flow<String, RequestDto, NotUsed> mapToDtoFlow = Flow.of(String.class)
.map(input -> mapper.readValue(input, RequestDto.class))
.log("error");
public static void main(String args[]) {
final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
final Sink<RequestDto, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final String json = "{\"datePurchased\":\"2022-03-03 21:32:017\"}";
Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);
}
}
1条答案
按热度按时间roejwanj1#
map
被转换为via
,但它在语法上并不完全等同于Flow.of().map()
。第一个将转换为
.via(Map(f))
,其中Map
是实现Map操作的GraphStage
。在第二种情况下,
mapToDtoFlow
(忽略log
)本身将是(在Scala表示法中)Flow[String].via(Map(f))
,因此您将添加另一层via
:.via(Flow[String].via(Map(f)))
.对于所有的意图和目的,它们都是相同的(我怀疑,当需要解释您构建的
RunnableGraph
时,物化器将完全相同地对待它们)。考虑到
.log
,mapToDtoFlow
是等价的(同样在Scala中):在Akka Streams中,基本上有三个定义流的级别,从最高级别到最低级别:
GraphStage
秒DSL仅仅指定了构建
GraphStage
的简洁方法,而将GraphStage
与Flow
形状链接起来的基本方法是通过via
操作。