我对Scala和Akka Stream有点陌生,我试图从WebSocket中获取JSON字符串消息,并将它们推到Kafka主题中。
现在我只在“从WS获取消息”部分工作。
来自WebSocket的消息如下所示:
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
我想将此JSON消息拆分为多个消息:
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
然后把这些信息都推到Kafka的主题上。
以下是我目前取得的成果:
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
它正在工作,我得到了预期的输出Json消息,但是我想知道我是否可以用一种更“Akka式”的风格来编写这个生成器,比如使用GraphDSL。所以我有几个问题:
- 使用GraphDSL可以连续使用WebSocket吗?如果可以,请给我举个例子好吗?
- 使用GraphDSL使用WS是个好主意吗?
- 我应该像我一样分解接收到的Json消息,然后再把它发送给Kafka吗?还是最好直接发送,这样延迟更低?
- 在给Kafka写了这封信之后,我打算用Apache Storm来使用它,这是个好主意吗?还是我应该坚持用Akka?
谢谢你阅读我,问候,Arès
1条答案
按热度按时间jgovgodb1#
这段代码很像 akka 语:
scaladsl
和GraphDSL
一样是Akka,或者实现一个自定义的GraphStage
。IMO/E,使用GraphDSL
的唯一原因是,如果图形的实际形状在scaladsl
中不容易表达。我个人会定义一个
CoinPrice
类,使模型显式化。然后有一个
Flow[Message, CoinPrice, NotUsed]
,它将1个传入消息解析为零个或多个CoinPrice
。根据消息中JSON的大小,您可能希望将其分为不同的流阶段,以允许JSON解析和提取到
CoinPrice
之间的异步边界。例如,在上面的示例中,
async
边界的任一侧上的阶段将在单独的参与者中执行,并且因此可能并发地执行(如果有足够的CPU内核等),以参与者协调和交换消息的额外开销为代价。该额外的协调/通信开销(参见Gunther的通用可伸缩性定律)只有在JSON对象足够大并且进入速度足够快(在前一个对象完成处理之前持续进入)的情况下,才值得这么做。如果您的目的是使用WebSocket直到程序停止,您可能会发现使用
Source.never[Message]
会更清楚。