akka 具有BlockingQueue订阅的WebSocket字

tzxcd3kk  于 2023-03-18  发布在  其他
关注(0)|答案(1)|浏览(153)

bounty将在2天后过期。回答此问题可获得+200声望奖励。Marc Grue希望引起更多人关注此问题。

我想建立一个类似于GraphQl的订阅,客户端可以通过WebSocket请求Akka Http服务器订阅数据库更新的结果,并通过websocket将它们推回到客户端。我的数据库通过java BlockingQueue发出事务,当事务数据与查询匹配时,一个新的结果应该被推送。我正在一个单独的线程中设置阻塞队列,希望它能通过Akka事件总线发送回更新。
但我是一个Akka新手,并且坚持使用我的设置,Sink甚至没有接收到来自客户端的带有查询的传入消息。我的思路正确吗?或者您可以帮助我找到正确的方向吗?
这是我通过阅读文档和浏览各种WebSocket示例到目前为止所能整理出来的(简化的):

implicit val system          : ActorSystem              = ActorSystem()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher

Http().newServerAt("localhost", 8080).bind(route)

lazy val route: Route = cors() {
  path("ws") {
    handleWebSocketMessages(
      Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
    )
  }
}

接收来自客户端的查询。显然这不是创建接收器的正确方法,因为没有接收到传入的消息。

val incomingMessages: Sink[Message, Future[Done]] = {
  Sink.foreach {
    case BinaryMessage.Strict(argsSerialized) =>
      println("Never gets here...")

      // Deserialize incoming query
      val query = deserialize(argsSerialized.asByteBuffer)

      // What to do when new query results emerge
      val callback: List[Any] => Unit = { (result: List[Any]) =>
        val resultBytes   = serialize(result)
        val binaryMessage = BinaryMessage(ByteString(resultBytes))

        // Publish serialized results to Akka eventbus
        system.eventStream.publish(binaryMessage)
      }

      // Subscribe to updated query results from queue in separate thread
      subscribe(query, callback)

    case other => throw new Exception("Unexpected incoming message: " + other)
  }
}

在更改与查询匹配时发回更新的查询结果:

val outgoingMessages: Source[Message, Any] = {
  Source.lazySource { () =>
    val (actorRef, itemSource) = Source
      .actorRef[BinaryMessage](
        completionMatcher = PartialFunction.empty,
        failureMatcher = PartialFunction.empty,
        1000,
        OverflowStrategy.backpressure // ?
      )
      .preMaterialize()

    system.eventStream.subscribe(actorRef, classTag[BinaryMessage].runtimeClass)
    itemSource
  }
}

在每个新的数据库事务之间分离线程的永久循环和阻塞

def subscribe(query: String, callback: List[Data] => Unit): Unit = {
  object TxReportWatcher extends Runnable {
    override def run(): Unit = {
      while (true) {
        try {
          // `take` blocks until new data is transacted
          matchResult(query, javaBlockingQueue.take).foreach(machingResult =>
            // Publish new result to the Akka eventbus
            callback(machingResult)
          )
        } catch {
          case e: InterruptedException => e.printStackTrace()
        }
      }
    }
  }
  Executors.newSingleThreadExecutor().execute(TxReportWatcher)
}
wydwbb8l

wydwbb8l1#

找到了一个解决方案。可以使用Akka Source.queue将更改的结果推回客户端,而不是使用Akka事件总线来通知更改。
新的结果被序列化并以二进制消息的形式提供给queue,每次发生这种情况时,source都会触发将消息推回客户端,这允许我们在WebSocket连接保持活动的情况下,随时让服务器将数据推送到客户端。
回调函数被传递给运行订阅机制的单独线程,这看起来是一个很好的关注点分离,因为回调调用者不需要知道任何关于websockets/actors/akka的信息-它只执行回调函数与更改的结果。

def wsFlow: Flow[Message, Message, _] = {
  // Use queue with 1 message at a time
  val (queue, source) = Source.queue[Message](
    1, OverflowStrategy.backpressure
  ).preMaterialize()

  val sink = Sink.foreach[Message] {
    case BinaryMessage.Strict(querySerialized) =>
      // Deserialize query
      val query = deserialize(querySerialized.asByteBuffer)

      // What to do when new query results emerge
      val callback: List[Any] => Unit = { (result: List[Any]) =>
        val resultBytes  = serialize(result)
        val binaryResult = BinaryMessage(ByteString(resultBytes))

        // Offer result to queue
        // This makes `source` push serialized result to client
        queue.offer(binaryResult)
      }

      // Subscribe to updated query results from db queue in separate thread
      subscribe(query, callback)

    case other => throw new Exception("Unexpected incoming message: " + other)
  }

  Flow.fromSinkAndSource(sink, source)
}

lazy val route: Route = cors() {
  path("ws") {
    handleWebSocketMessages(wsFlow)
  }
}

相关问题