我是Akka流的新手,我想学习如何为我的一个项目处理TCP套接字。我从Akka Stream official documentation中提取了这段代码。
import akka.stream.scaladsl.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
如果我使用netcat从终端进行连接,我可以看到Akka Stream TCP套接字按预期工作。
import akka.stream.scaladsl.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWhile(_.toLowerCase.trim != "exit") // < - - - - - - HERE
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
我找不到的是如何管理由CMD + C
操作关闭的套接字。Akka Stream使用Akka.io在内部管理TCP连接,因此当套接字关闭时,它必须发送一些PeerClose
消息。因此,我对Akka.io的了解告诉我,我应该从套接字关闭中接收到反馈,但我找不到如何使用Akka Stream进行此操作。有办法解决吗?
1条答案
按热度按时间93ze6v8z1#
connection.handleWith(echo)
是connection.flow.joinMat(echo)(Keep.right).run()
的语法糖,它的物化值为echo
,这通常是无用的。Flow.via.map.takeWhile
的物化值为NotUsed
,因此基本上也是无用的。但是,您可以将stage附加到echo
,它将以不同的方式物化。其中之一是
.watchTermination
:这将不区分您端或远程端正常关闭连接;它将辨别出流失败。