akka 如何从接收器发出消息并将它们传递给另一个函数?

vbopmzt1  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(113)

我目前正在使用Akka-HTTP构建一个客户端WebSockets消费者。我不想尝试在Sink中进行解析,而是想将代码 Package 在一个函数中,该函数从Sink发出结果,然后使用该输出(来自函数)进行进一步处理(更多解析......等)。
我目前能够打印来自Sink的每一条消息;但是,函数的返回类型仍然是Unit。我的目标是从函数中发出一个String,用于接收器中的每个项,然后使用返回的字符串进行进一步的解析。我有目前为止的代码(注意:它主要是锅炉板)。

import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

object client extends App {

  def parseData(uri: String)(implicit system: ActorSystem, materializer: Materializer): Unit = {

    val defaultSettings = ClientConnectionSettings(system)
    val pingCounter = new AtomicInteger()
    val customWebsocketSettings = defaultSettings.websocketSettings.withPeriodicKeepAliveData(
      () => ByteString(s"debug-${pingCounter.incrementAndGet()}")
    )

    val customSettings = defaultSettings.withWebsocketSettings(customWebsocketSettings)

    val outgoing = Source.maybe[Message]

    val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {
      case message: TextMessage.Strict => message.text // I Want to emit/stream this message as a String from the function (or get a handle on it from the outside)
      case _ => println("Other")
    }

    val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
      Http().webSocketClientFlow(WebSocketRequest(uri), settings = customSettings)

    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right)
        .toMat(sink)(Keep.both)
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(
          s"Connection failed: ${upgrade.response.status}"
        )
      }
    }

    connected.onComplete {
      case Success(value) => value
      case Failure(exception) => throw exception
    }

    closed.onComplete { _ =>
      println("Retrying...")
      parseData(uri)
    }

    upgradeResponse.onComplete {
      case Success(value) => println(value)
      case Failure(exception) => throw exception
    }
  }
}

在一个单独的对象中,我想做解析,比如:

import akka.actor.ActorSystem
import akka.stream.Materializer
import api.client.parseData

object Application extends App {
  implicit val system: ActorSystem = ActorSystem()
  implicit val materializer: Materializer = Materializer(system)
  val uri = "ws://localhost:8080/foobar"

  val res = parseData(uri) // I want to handle the function output here 
  // parse(res)
  println(res)

有没有方法可以从函数外部获得接收器的句柄,或者我需要在接收器中进行任何解析。我主要是试图不使接收器过于复杂。
更新:我还在考虑,向流中添加另一个Flow元素(它处理解析)是否比从流外部获取值更好。

unftdfkk

unftdfkk1#

添加一个流元素似乎可以解决您的问题,同时又完全符合习惯。
您必须记住的是,sink语义旨在描述如何“终止”流,因此,尽管它可以描述非常复杂的计算,但它将始终返回一个值,该值仅在流结束时返回。
换句话说,接收器并不是针对每个流元素返回一个值,而是针对每个整个流返回一个值。

相关问题