Akka Stream,Tcp().bind,客户端关闭套接字时的句柄

332nm8kg  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(137)

我是Akka流的新手,我想学习如何为我的一个项目处理TCP套接字。我从Akka Stream official documentation中提取了这段代码。

import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .map(_ + "!!!\n")
    .map(ByteString(_))

  connection.handleWith(echo)
}

如果我使用netcat从终端进行连接,我可以看到Akka Stream TCP套接字按预期工作。

import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .takeWhile(_.toLowerCase.trim != "exit")   // < - - - - - - HERE
    .map(_ + "!!!\n")
    .map(ByteString(_))

  connection.handleWith(echo)
}

我找不到的是如何管理由CMD + C操作关闭的套接字。Akka Stream使用Akka.io在内部管理TCP连接,因此当套接字关闭时,它必须发送一些PeerClose消息。因此,我对Akka.io的了解告诉我,我应该从套接字关闭中接收到反馈,但我找不到如何使用Akka Stream进行此操作。有办法解决吗?

93ze6v8z

93ze6v8z1#

connection.handleWith(echo)connection.flow.joinMat(echo)(Keep.right).run()的语法糖,它的物化值为echo,这通常是无用的。Flow.via.map.takeWhile的物化值为NotUsed,因此基本上也是无用的。但是,您可以将stage附加到echo,它将以不同的方式物化。
其中之一是.watchTermination

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo: Flow[ByteString, ByteString, Future[Done]] = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .takeWhile(_.toLowerCase.trim != "exit")   // < - - - - - - HERE
    .map(_ + "!!!\n")
    .map(ByteString(_))
    // change the materialized value to a Future[Done]
    .watchTermination()(Keep.right)

  // you may need to have an implicit ExecutionContext in scope, e.g. system.dispatcher,
  //  if you don't already
  connection.handleWith(echo).onComplete {
    case Success(_) => println("stream completed successfully")
    case Failure(e) => println(e.getMessage)
  }
}

这将不区分您端或远程端正常关闭连接;它将辨别出流失败。

相关问题