我目前正在使用Akka-HTTP构建一个客户端WebSockets消费者。我不想尝试在Sink中进行解析,而是想将代码 Package 在一个函数中,该函数从Sink发出结果,然后使用该输出(来自函数)进行进一步处理(更多解析......等)。
我目前能够打印来自Sink的每一条消息;但是,函数的返回类型仍然是Unit
。我的目标是从函数中发出一个String
,用于接收器中的每个项,然后使用返回的字符串进行进一步的解析。我有目前为止的代码(注意:它主要是锅炉板)。
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
object client extends App {
def parseData(uri: String)(implicit system: ActorSystem, materializer: Materializer): Unit = {
val defaultSettings = ClientConnectionSettings(system)
val pingCounter = new AtomicInteger()
val customWebsocketSettings = defaultSettings.websocketSettings.withPeriodicKeepAliveData(
() => ByteString(s"debug-${pingCounter.incrementAndGet()}")
)
val customSettings = defaultSettings.withWebsocketSettings(customWebsocketSettings)
val outgoing = Source.maybe[Message]
val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {
case message: TextMessage.Strict => message.text // I Want to emit/stream this message as a String from the function (or get a handle on it from the outside)
case _ => println("Other")
}
val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest(uri), settings = customSettings)
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(sink)(Keep.both)
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
connected.onComplete {
case Success(value) => value
case Failure(exception) => throw exception
}
closed.onComplete { _ =>
println("Retrying...")
parseData(uri)
}
upgradeResponse.onComplete {
case Success(value) => println(value)
case Failure(exception) => throw exception
}
}
}
在一个单独的对象中,我想做解析,比如:
import akka.actor.ActorSystem
import akka.stream.Materializer
import api.client.parseData
object Application extends App {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = Materializer(system)
val uri = "ws://localhost:8080/foobar"
val res = parseData(uri) // I want to handle the function output here
// parse(res)
println(res)
有没有方法可以从函数外部获得接收器的句柄,或者我需要在接收器中进行任何解析。我主要是试图不使接收器过于复杂。
更新:我还在考虑,向流中添加另一个Flow
元素(它处理解析)是否比从流外部获取值更好。
1条答案
按热度按时间unftdfkk1#
添加一个流元素似乎可以解决您的问题,同时又完全符合习惯。
您必须记住的是,sink语义旨在描述如何“终止”流,因此,尽管它可以描述非常复杂的计算,但它将始终返回一个值,该值仅在流结束时返回。
换句话说,接收器并不是针对每个流元素返回一个值,而是针对每个整个流返回一个值。