akka 将源[ByteString,NotUsed]值输出到控制台

tuwxkamq  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(185)

如何在控制台中打印源的值。

val someSource = Source.single(ByteString("SomeValue"))

我想从这个源中打印字符串“SomeValue”。我尝试了:

someSource.to(Sink.foreach(println)) //This one prints RunnableGraph object

someSource.map(each => {
    val pqr = each.decodeString(ByteString.UTF_8)
    print(pqr)
}) // THis one prints res3: soneSource.Repr[Unit]  = Source(SourceShape(Map.out(169373838)))

如何打印最初用于创建单个对象的源的原始字符串。

yyyllmsg

yyyllmsg1#

从问题中所写的内容来看,我认为您很可能正在使用Scala控制台或Scala工作表。
在Scala控制台或工作集中,它打印当前语句中创建的内容的表示。

scala> val i = 5
val i: Int = 5

scala> val s = "ssfdf"
val s: String = ssfdf

但是,当你在这里使用println时,

scala> val u = println("dfsd")
dfsd
val u: Unit = ()

它还执行println,然后打印该println创建的值u实际上是一个Unit
这可能就是您的困惑所在,因为Sink.foreach中的println在本例中不起作用。
这是因为这种情况更像下面的情况,即您实际上是在定义一个函数。

scala> val f1 = (s: String) => println(s)
val f1: String => Unit = $Lambda$1062/0x0000000800689840@1796b2d4

这里没有使用println,只是定义了一个将使用println的函数(String => UnitFunction1[String, Unit]的示例)。
因此,控制台只输出这里创建的值f1的类型为String => Unit
您将需要调用此函数来实际执行println

scala> f1.apply("dsfsd")
dsfsd

类似地,someSource.to(Sink.foreach(println))将创建一个RunnableGraph类型的值,因此scala控制台将打印类似于val res0: RunnableGraph...的内容。
现在,您需要运行此图以实际执行它。
但是与前面的函数示例相比,graph的执行是在线程池上异步发生的,这意味着它可能在Scala控制台或工作集的某些版本中不起作用(取决于线程池生命周期是如何管理的)。

scala> val someSource = Source.single(ByteString("SomeValue"))
val someSource: akka.stream.scaladsl.Source[akka.util.ByteString,akka.NotUsed] = Source(SourceShape(single.out(369296388)))

scala> val runnableGraph = someSource.to(Sink.foreach(println))
val runnableGraph: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = RunnableGraph

scala> runnableGraph.run()

如果它起作用,您将看到以下内容:

scala> runnableGraph.run()
val res0: akka.NotUsed = NotUsed
ByteString(83, 111, 109, 101, 86, 97, 108, 117, 101)

但您可能会看到一些错误,这些错误与控制台由于某些原因而无法完成图形运行有关。
实际上,你需要具体化Sink,它在运行图时会产生一个Future[Done],然后你需要等待使用AwaitFuture[Done]
您必须将所有这些内容放入一个普通的Scala文件中,并作为Scala应用程序执行。

import akka.{Done, actor}
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object TestAkkaStream extends App {

  val actorSystem = ActorSystem(Behaviors.empty, "test-stream-system")

  implicit val classicActorSystem = actorSystem.classicSystem

  val someSource = Source.single(ByteString("SomeValue"))

  val runnableGraph = someSource.toMat(Sink.foreach(println))(Keep.right)

  val graphRunDoneFuture: Future[Done] = runnableGraph.run()

  Await.result(graphRunDoneFuture, Duration.Inf)
}

相关问题