我正在尝试运行一个简单的Akka流文件接收器示例,但是没有成功。我可以创建一个Source,运行Flow,然后创建一个文件,但是ByteString没有被写入文件。然而,如果我尝试将流输出打印到控制台,我可以这样做。我是否遗漏了什么?
import akka.stream._
import akka.stream.scaladsl._
import akka.{ NotUsed, Done}
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
import scala.concurrent.duration._
import java.nio.file.Paths
object First extends App {
val source: Source[Int, NotUsed] = Source ( 1 to 100)
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
// works: prints 1-100
//source.runForeach(println) (materializer)
val factorials = source.scan(BigInt(1))((acc,next) => acc * next)
// there is no content in the Sink (file)
/**val result =
factorials
.map(num => ByteString(s"${num}\n"))
.runWith(FileIO.toPath(Paths.get("factorials.txt")))
**/
def lineSink(fileName: String): Sink[String, Future[IOResult]] =
Flow[String]
.map(s => ByteString(s + "\n"))
.toMat(FileIO.toPath(Paths.get(fileName))) (Keep.right)
//There is no content in the Sink.
factorials.map(_.toString).runWith(lineSink("factorials.txt"))
system.terminate()
}
版本.sbt具有:
name := "akkaGuide"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.4.10"
)
提前感谢您抽出时间。
2条答案
按热度按时间vybvopom1#
我想您可能终止得太早了。请尝试等待
Future
完成:9gm1akwq2#
请看下面的工作示例: