bounty将在2天后过期。回答此问题可获得+200声望奖励。Marc Grue希望引起更多人关注此问题。
我想建立一个类似于GraphQl的订阅,客户端可以通过WebSocket请求Akka Http服务器订阅数据库更新的结果,并通过websocket将它们推回到客户端。我的数据库通过java BlockingQueue发出事务,当事务数据与查询匹配时,一个新的结果应该被推送。我正在一个单独的线程中设置阻塞队列,希望它能通过Akka事件总线发送回更新。
但我是一个Akka新手,并且坚持使用我的设置,Sink甚至没有接收到来自客户端的带有查询的传入消息。我的思路正确吗?或者您可以帮助我找到正确的方向吗?
这是我通过阅读文档和浏览各种WebSocket示例到目前为止所能整理出来的(简化的):
implicit val system : ActorSystem = ActorSystem()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
Http().newServerAt("localhost", 8080).bind(route)
lazy val route: Route = cors() {
path("ws") {
handleWebSocketMessages(
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
)
}
}
接收来自客户端的查询。显然这不是创建接收器的正确方法,因为没有接收到传入的消息。
val incomingMessages: Sink[Message, Future[Done]] = {
Sink.foreach {
case BinaryMessage.Strict(argsSerialized) =>
println("Never gets here...")
// Deserialize incoming query
val query = deserialize(argsSerialized.asByteBuffer)
// What to do when new query results emerge
val callback: List[Any] => Unit = { (result: List[Any]) =>
val resultBytes = serialize(result)
val binaryMessage = BinaryMessage(ByteString(resultBytes))
// Publish serialized results to Akka eventbus
system.eventStream.publish(binaryMessage)
}
// Subscribe to updated query results from queue in separate thread
subscribe(query, callback)
case other => throw new Exception("Unexpected incoming message: " + other)
}
}
在更改与查询匹配时发回更新的查询结果:
val outgoingMessages: Source[Message, Any] = {
Source.lazySource { () =>
val (actorRef, itemSource) = Source
.actorRef[BinaryMessage](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
1000,
OverflowStrategy.backpressure // ?
)
.preMaterialize()
system.eventStream.subscribe(actorRef, classTag[BinaryMessage].runtimeClass)
itemSource
}
}
在每个新的数据库事务之间分离线程的永久循环和阻塞
def subscribe(query: String, callback: List[Data] => Unit): Unit = {
object TxReportWatcher extends Runnable {
override def run(): Unit = {
while (true) {
try {
// `take` blocks until new data is transacted
matchResult(query, javaBlockingQueue.take).foreach(machingResult =>
// Publish new result to the Akka eventbus
callback(machingResult)
)
} catch {
case e: InterruptedException => e.printStackTrace()
}
}
}
}
Executors.newSingleThreadExecutor().execute(TxReportWatcher)
}
1条答案
按热度按时间wydwbb8l1#
找到了一个解决方案。可以使用Akka Source.queue将更改的结果推回客户端,而不是使用Akka事件总线来通知更改。
新的结果被序列化并以二进制消息的形式提供给
queue
,每次发生这种情况时,source
都会触发将消息推回客户端,这允许我们在WebSocket连接保持活动的情况下,随时让服务器将数据推送到客户端。回调函数被传递给运行订阅机制的单独线程,这看起来是一个很好的关注点分离,因为回调调用者不需要知道任何关于websockets/actors/akka的信息-它只执行回调函数与更改的结果。