akka websocket客户端:主动断开与服务器的连接和/或替换接收器

inb24sb2  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(464)

我开始学习akka,遇到了一个挑战,尽管我已经阅读了文档和相关的stakoverflow问题,但我找不到一个简单的解决方案:
基于akka网站上的客户端websocket支持示例,我在scala中使用以下代码段作为基础:

val flow: Flow[Message, Message, Future[Done]] =
  Flow.fromSinkAndSourceMat(printSink, Source.maybe)(Keep.left)

  val (upgradeResponse, closed) =
  Http().singleWebSocketRequest(WebSocketRequest("ws://localhost/ws"), flow)

我使用的用例是一个客户端(printsink),它使用来自websocket服务器的连续流。通信仅单向,因此不需要源。
我的问题是:
我需要定期强制重新连接到websocket服务器,为此我需要先断开连接。但就我的一生而言,我找不到一个简单的断开连接的方法
在一个稍微相反的场景中,我需要保持websocket连接处于活动状态并“交换”接收器。这是否可能,即不创建另一个websocket连接?

ekqde3dh

ekqde3dh1#

对于问题1(强制断开与客户端的连接),这应该是可行的

val flow: Flow[Message, Message, (Future[Done], Promise[Option[Message])] =
  Flow.fromSinkAndSourceMat(
    printSink,
    Source.maybe
  )(Keep.both)

val (upgradeResponse, (closed, disconnect)) =
  Http().singleWebsocketRequest(WebSocketRequest("ws://localhost/ws"), flow)
``` `disconnect` 然后可以用 `None` 断开连接:

disconnect.success(None)

对于问题2,我的直觉是,这种动态流操作似乎需要一个自定义流操作符(即,一个级别低于graph dsl,两个级别低于“normal”) `scaladsl` / `javadsl` ). 老实说,我在那里没有太多的直接经验。

相关问题