我们如何在scala play框架中使用SSE?我能找到的大多数资源都是用来制作SSE源代码的。我想可靠地监听来自其他服务的SSE事件(使用autoconnect)。最相关的文章是https://doc.akka.io/docs/alpakka/current/sse.html。我实现了这个,但它似乎不起作用(代码如下)。还有我正在使用的事件
@Singleton
class SseConsumer @Inject()((implicit ec: ExecutionContext) {
implicit val system = ActorSystem()
val send: HttpRequest => Future[HttpResponse] = foo
def foo(x:HttpRequest) = {
try {
println("foo")
val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
val newHeaders = x.withHeaders(authHeader)
Http().singleRequest(newHeaders)
}catch {
case e:Exception => {
println("Exception", e.printStackTrace())
throw e
}
}
}
val eventSource: Source[ServerSentEvent, NotUsed] =
EventSource(
uri = Uri("https://abc/v1/events"),
send,
initialLastEventId = Some("2"),
retryDelay = 1.second
)
def orderStatusEventStable() = {
val events: Future[immutable.Seq[ServerSentEvent]] =
eventSource
.throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
.take(10)
.runWith(Sink.seq)
events.map(_.foreach( x => {
println("456")
println(x.data)
}))
}
Future {
blocking{
while(true){
try{
Thread.sleep(2000)
orderStatusEventStable()
} catch {
case e:Exception => {
println("Exception", e.printStackTrace())
}
}
}
}
}
}
这不会给予任何异常,并且println(“456”)永远不会被打印。
编辑:
Future {
blocking {
while(true){
try{
Await.result(orderStatusEventStable() recover {
case e: Exception => {
println("exception", e)
throw e
}
}, Duration.Inf)
} catch {
case e:Exception => {
println("Exception", e.printStackTrace())
}
}
}
}
}
添加了一个等待,它开始工作。能够阅读10个消息在同一时间。但现在我面临着另一个问题。我有一个生产者,有时可以生产快于我可以消费和与此代码我有2个问题:
1.我必须等到有10条消息可用。我们怎么能最多接收10条消息,最少接收0条消息?
1.当生产率〉消耗率时,我丢失了一些事件。我猜这是由于节流造成的。我们如何使用背压处理它?
1条答案
按热度按时间bmvo0sr51#
您的代码中的问题是
events: Future
仅在流(eventSource
)完成时才完成。我不熟悉SSE,但在您的情况下,流可能永远不会完成,因为它总是在侦听新事件。
您可以在Akka Stream文档中了解更多信息。
根据您要对事件执行的操作,您可以只在流上执行
map
,如下所示:基本上,您需要在数据通过Akka Stream
Source
时使用它,但不要等待它完成。编辑:我没有注意到
take(10)
,我的答案只适用于take
不在这里的情况。发送10个事件后,您的代码应该可以工作。