akka 标测图和Flow之间的差异

tpgth1q7  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(137)

从2016年的阅读谷歌群帖中可以看出:“.map()已转换为.via()”
源代码:https://groups.google.com/g/akka-user/c/EzHygZpcCHg
下列代码行是否等效:

Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);

是否存在应使用图而非流的情况?
源代码:
请求DTO:

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;

import java.util.Date;

@Getter
@Setter
@Builder
@ToString
@Jacksonized
public class RequestDto {
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
    private final Date datePurchased;
}

StreamManager(包含主方法):

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.concurrent.CompletionStage;

public class StreamManager {

    final static ObjectMapper mapper = new ObjectMapper();

    private static final Flow<String, RequestDto, NotUsed> mapToDtoFlow = Flow.of(String.class)
            .map(input -> mapper.readValue(input, RequestDto.class))
            .log("error");

    public static void main(String args[]) {

        final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
        final Sink<RequestDto, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);

        final String json = "{\"datePurchased\":\"2022-03-03 21:32:017\"}";

        Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
        Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);

    }

}
roejwanj

roejwanj1#

map被转换为via,但它在语法上并不完全等同于Flow.of().map()
第一个将转换为.via(Map(f)),其中Map是实现Map操作的GraphStage
在第二种情况下,mapToDtoFlow(忽略log)本身将是(在Scala表示法中)Flow[String].via(Map(f)),因此您将添加另一层via.via(Flow[String].via(Map(f))) .
对于所有的意图和目的,它们都是相同的(我怀疑,当需要解释您构建的RunnableGraph时,物化器将完全相同地对待它们)。
考虑到.logmapToDtoFlow是等价的(同样在Scala中):

Flow[String]
  .via(Map(f))
  .via(Log(...))

在Akka Streams中,基本上有三个定义流的级别,从最高级别到最低级别:

  • Java/Scala DSL语言
  • Java/Scala图形DSL
  • GraphStage

DSL仅仅指定了构建GraphStage的简洁方法,而将GraphStageFlow形状链接起来的基本方法是通过via操作。

相关问题