akka 在scala play框架中使用服务器发送事件(SSE)并自动重新连接

hlswsv35  于 2022-11-05  发布在  Scala
关注(0)|答案(1)|浏览(192)

我们如何在scala play框架中使用SSE?我能找到的大多数资源都是用来制作SSE源代码的。我想可靠地监听来自其他服务的SSE事件(使用autoconnect)。最相关的文章是https://doc.akka.io/docs/alpakka/current/sse.html。我实现了这个,但它似乎不起作用(代码如下)。还有我正在使用的事件

@Singleton
class SseConsumer @Inject()((implicit ec: ExecutionContext) {

   implicit val system = ActorSystem()

  val send: HttpRequest => Future[HttpResponse] =  foo
  def foo(x:HttpRequest) = {
    try {
      println("foo")
      val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
      val newHeaders = x.withHeaders(authHeader)
      Http().singleRequest(newHeaders)

    }catch {
      case e:Exception => {
        println("Exception", e.printStackTrace())
        throw e
      }
    }
  }

  val eventSource: Source[ServerSentEvent, NotUsed] =
    EventSource(
      uri = Uri("https://abc/v1/events"),
      send,
      initialLastEventId = Some("2"),
      retryDelay = 1.second
    )

  def orderStatusEventStable() = {
    val events: Future[immutable.Seq[ServerSentEvent]] =
      eventSource
        .throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
        .take(10)
        .runWith(Sink.seq)
    events.map(_.foreach( x => {
      println("456")
      println(x.data)
    }))
  }

  Future {
    blocking{
      while(true){
        try{
          Thread.sleep(2000)
          orderStatusEventStable()
        } catch {
          case e:Exception => {
            println("Exception", e.printStackTrace())
          }
        }
      }
    }
  }

}

这不会给予任何异常,并且println(“456”)永远不会被打印。

编辑

Future {
    blocking {
      while(true){
        try{
          Await.result(orderStatusEventStable() recover {
            case e: Exception => {
              println("exception", e)
              throw e
            }
          }, Duration.Inf)
        } catch {
          case e:Exception => {
            println("Exception", e.printStackTrace())
          }
        }
      }
    }
  }

添加了一个等待,它开始工作。能够阅读10个消息在同一时间。但现在我面临着另一个问题。我有一个生产者,有时可以生产快于我可以消费和与此代码我有2个问题:
1.我必须等到有10条消息可用。我们怎么能最多接收10条消息,最少接收0条消息?
1.当生产率〉消耗率时,我丢失了一些事件。我猜这是由于节流造成的。我们如何使用背压处理它?

bmvo0sr5

bmvo0sr51#

您的代码中的问题是events: Future仅在流(eventSource)完成时才完成。
我不熟悉SSE,但在您的情况下,流可能永远不会完成,因为它总是在侦听新事件。
您可以在Akka Stream文档中了解更多信息。
根据您要对事件执行的操作,您可以只在流上执行map,如下所示:

eventSource
  ...
  .map(/* do something */)
  .runWith(...)

基本上,您需要在数据通过Akka Stream Source时使用它,但不要等待它完成。
编辑:我没有注意到take(10),我的答案只适用于take不在这里的情况。发送10个事件后,您的代码应该可以工作。

相关问题