我正在使用Akka Streams从一个文件中输出文本行。我现在要做的就是把每一行都打印到stdout。我可以在没有任何节流的情况下完成这个工作,也就是说,所有的行都立即打印到stdout。但是当我尝试节流流时,比如每秒打印1行,流在把第一行打印到stdout后似乎就取消了。
object FStream {
// Make config implicit.
implicit val conf = ConfigUtils.loadAppConfig[ArrivalsAppConfig]("arrivals")
import AkkaStreamUtils.defaultActorSystem._
// Source of raw data
def rawDataStream(path: String): Source[ByteString, Future[IOResult]] = {
val file = Paths.get(path)
val ioRes: Source[ByteString, Future[IOResult]] = FileIO.fromPath(file)
ioRes
}
def main(args: Array[String]): Unit = {
val eSource: Source[ByteString, Future[IOResult]] =
rawDataStream(conf.eventsFilePath)
val eFlow =
Flow[ByteString]
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000))
.map(bs => bs.utf8String)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
val eSink = Sink.foreach(println)
eSource.via(eFlow).runWith(eSink)
}
}
输入文件
James
Is
My
First
Name
预期结果
$ sbt run
James # 0s
Is # 1s
My # 2s
First # 3s
Name # 2s
实际结果
$ sbt run
James # 0s
1条答案
按热度按时间xxls0lw81#
您的主方法没有等待流完成。
将
runWith
与Sink.foreach
组合后,您将得到需要等待的Future
:请参阅https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html上的文档
接收器具体化为
Future[Done]
,它在流完成时完成,或者在流失败时失败。