Akka流在节流时失败

4ngedf3f  于 2022-12-13  发布在  其他
关注(0)|答案(1)|浏览(124)

我正在使用Akka Streams从一个文件中输出文本行。我现在要做的就是把每一行都打印到stdout。我可以在没有任何节流的情况下完成这个工作,也就是说,所有的行都立即打印到stdout。但是当我尝试节流流时,比如每秒打印1行,流在把第一行打印到stdout后似乎就取消了。

object FStream {

  // Make config implicit.
  implicit val conf = ConfigUtils.loadAppConfig[ArrivalsAppConfig]("arrivals")

  import AkkaStreamUtils.defaultActorSystem._

  // Source of raw data
  def rawDataStream(path: String): Source[ByteString, Future[IOResult]] = {

    val file = Paths.get(path)

    val ioRes: Source[ByteString, Future[IOResult]] = FileIO.fromPath(file)

    ioRes
  }

  def main(args: Array[String]): Unit = {

    val eSource: Source[ByteString, Future[IOResult]] = 
      rawDataStream(conf.eventsFilePath)

    val eFlow = 
      Flow[ByteString]
        .via(Framing.delimiter(ByteString(System.lineSeparator), 10000))
        .map(bs => bs.utf8String)
        .throttle(1, 1.second, 1, ThrottleMode.shaping)

    val eSink = Sink.foreach(println)

    eSource.via(eFlow).runWith(eSink)
  }
}

输入文件

James
Is
My
First
Name

预期结果

$ sbt run
James # 0s
Is    # 1s
My    # 2s
First # 3s
Name  # 2s

实际结果

$ sbt run
James # 0s
xxls0lw8

xxls0lw81#

您的主方法没有等待流完成。
runWithSink.foreach组合后,您将得到需要等待的Future

val f = eSource.via(eFlow).runWith(eSink)

Await.result(f, 10.seconds)

请参阅https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html上的文档
接收器具体化为Future[Done],它在流完成时完成,或者在流失败时失败。

相关问题