Akka流持续消耗WebSocket

k10s72fa  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(158)

我对Scala和Akka Stream有点陌生,我试图从WebSocket中获取JSON字符串消息,并将它们推到Kafka主题中。
现在我只在“从WS获取消息”部分工作。
来自WebSocket的消息如下所示:

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

我想将此JSON消息拆分为多个消息:

{"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

然后把这些信息都推到Kafka的主题上。
以下是我目前取得的成果:

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

它正在工作,我得到了预期的输出Json消息,但是我想知道我是否可以用一种更“Akka式”的风格来编写这个生成器,比如使用GraphDSL。所以我有几个问题:

  • 使用GraphDSL可以连续使用WebSocket吗?如果可以,请给我举个例子好吗?
  • 使用GraphDSL使用WS是个好主意吗?
  • 我应该像我一样分解接收到的Json消息,然后再把它发送给Kafka吗?还是最好直接发送,这样延迟更低?
  • 在给Kafka写了这封信之后,我打算用Apache Storm来使用它,这是个好主意吗?还是我应该坚持用Akka?

谢谢你阅读我,问候,Arès

jgovgodb

jgovgodb1#

这段代码很像 akka 语:scaladslGraphDSL一样是Akka,或者实现一个自定义的GraphStage。IMO/E,使用GraphDSL的唯一原因是,如果图形的实际形状在scaladsl中不容易表达。
我个人会定义一个CoinPrice类,使模型显式化。

case class CoinPrice(coin: String, price: BigDecimal)

然后有一个Flow[Message, CoinPrice, NotUsed],它将1个传入消息解析为零个或多个CoinPrice

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

根据消息中JSON的大小,您可能希望将其分为不同的流阶段,以允许JSON解析和提取到CoinPrice之间的异步边界。例如,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

在上面的示例中,async边界的任一侧上的阶段将在单独的参与者中执行,并且因此可能并发地执行(如果有足够的CPU内核等),以参与者协调和交换消息的额外开销为代价。该额外的协调/通信开销(参见Gunther的通用可伸缩性定律)只有在JSON对象足够大并且进入速度足够快(在前一个对象完成处理之前持续进入)的情况下,才值得这么做。
如果您的目的是使用WebSocket直到程序停止,您可能会发现使用Source.never[Message]会更清楚。

相关问题