akka 流:无法写入文件接收器

fae0ux8s  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(198)

我正在尝试运行一个简单的Akka流文件接收器示例,但是没有成功。我可以创建一个Source,运行Flow,然后创建一个文件,但是ByteString没有被写入文件。然而,如果我尝试将流输出打印到控制台,我可以这样做。我是否遗漏了什么?

import akka.stream._ 
import akka.stream.scaladsl._
import akka.{ NotUsed, Done}
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
import scala.concurrent.duration._
import java.nio.file.Paths

object First extends App {

  val source: Source[Int, NotUsed] = Source ( 1 to 100)

  implicit val system = ActorSystem("QuickStart")
  implicit val materializer = ActorMaterializer()

  // works: prints 1-100
  //source.runForeach(println) (materializer)

  val factorials = source.scan(BigInt(1))((acc,next) => acc * next)

  // there is no content in the Sink (file)
  /**val result =
    factorials
    .map(num => ByteString(s"${num}\n"))
    .runWith(FileIO.toPath(Paths.get("factorials.txt")))

**/

  def lineSink(fileName: String): Sink[String, Future[IOResult]] =
    Flow[String]
    .map(s => ByteString(s + "\n"))
    .toMat(FileIO.toPath(Paths.get(fileName))) (Keep.right)

  //There is no content in the Sink.
  factorials.map(_.toString).runWith(lineSink("factorials.txt"))

system.terminate()

}

版本.sbt具有:

name := "akkaGuide"
    version := "1.0"
    scalaVersion := "2.11.8"
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-stream" % "2.4.10"
    )

提前感谢您抽出时间。

vybvopom

vybvopom1#

我想您可能终止得太早了。请尝试等待Future完成:

val result = factorials.map(_.toString).runWith(lineSink("factorials.txt"))
import system.dispatcher
result.onComplete { _ => system.terminate() }
9gm1akwq

9gm1akwq2#

请看下面的工作示例:

package ru.io

import java.io.File

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.util.ByteString

import scala.util.{Failure, Success}

object WriteStreamApp extends App {
  implicit val actorSystem      = ActorSystem()
  implicit val flowMaterializer = ActorMaterializer()
  import actorSystem.dispatcher

  // Source
  val source = Source(1 to 10000).filter(isPrime)

  // Sink
  val sink = FileIO.toFile(new File("src/main/resources/prime.txt"))

  // output for file
  val fileSink = Flow[Int]
    .map(i => ByteString(i.toString))
    .toMat(sink)((_, bytesWritten) => bytesWritten)

  val consoleSink = Sink.foreach[Int](println)

  // using Graph API send the integers to both skins: file and console
  val graph = GraphDSL.create(fileSink, consoleSink)((file, _) => file) { implicit builder => (file, console) =>
    import GraphDSL.Implicits._

    val broadCast = builder.add(Broadcast[Int](2))

    source ~> broadCast ~> file
    broadCast ~> console
    ClosedShape
  }

  val materialized = RunnableGraph.fromGraph(graph).run()

  // make sure the system is terminated
  materialized.onComplete {
    case Success(_) =>
      actorSystem.terminate()
    case Failure(e) =>
      println(s"Failure: ${e.getMessage}")
      actorSystem.terminate()
  }

  def isPrime(n: Int): Boolean = {
    if (n <= 1) false
    else if (n == 2) true
    else !(2 until n).exists(x => n % x == 0)
  }
}

相关问题