akka 使用分块传输编码从scala Play服务器流式传输case类对象

xbp102n0  于 2022-11-06  发布在  Scala
关注(0)|答案(1)|浏览(155)

所以,我使用Play framework 2.7来设置一个流媒体服务器。我尝试做的是流大约500个大小相似的自定义case类对象。
这是产生流的控制器的一部分-

def generate: Action[AnyContent] = Action {
    val products = (1 to 500).map(Product(_, "some random string")).toList
    Ok.chunked[Product](Source(products))
  }

其中Product是我正在使用的自定义case类。一个隐式writable将这个对象反序列化为一个json。
这是处理该流的控制器的一部分-

def process(): Action[AnyContent] = Action.async {
    val request = ws.url(STREAMING_URL).withRequestTimeout(Duration.Inf).withMethod("GET")
    request.stream().flatMap {
      _.bodyAsSource
        .map(_.utf8String)
        .map { x => println(x); x }
        .fold(0) { (acc, _) => acc + 1 }
        .runWith(Sink.last)
        .andThen {
          case Success(v) => println(s"Total count - $v")
          case Failure(_) => println("Error encountered")
        }
    }.map(_ => Ok)
  }

我所期望的是case类的每个对象都作为一个单独的块来传输和接收,这样它们就可以被单独序列化并被接收方使用。这意味着,使用上面的代码,我的期望是我应该正好接收500个块,但这个值总是比这个值多。
我能看到的是,在这500个对象中,正好有一个对象是以拆分的方式传输的,并以2个块而不是1个块传输。
这是一个正常的物体,从接收端看-

{
  "id" : 494,
  "name" : "some random string"
}

这是一个一分为二的物体

{
  "id" : 463,
  "name" : "some random strin
g"
}

因此,这不能被序列化回我的Product case类的示例中。
但是,如果我在发送方控制器中对源进行了某种限制,那么我会按预期收到数据块。
例如,如果我每秒只传输5个元素,这就完全可以了-

def generate: Action[AnyContent] = Action {
    val products = (1 to 500).map(Product(_, "some random string")).toList
    Ok.chunked[Product](Source(products).throttle(5, 1.second))
  }

谁能帮我理解为什么会发生这种情况?

pzfprimi

pzfprimi1#

here所述,有一个JsonFraming将有效的JSON对象与传入的ByteString流分开。
对你来说,你可以试试这种方法

_.bodyAsSource.via(JsonFraming.objectScanner(Int.MaxValue)).map(_.utf8String)

相关问题