如何在Akka Http中使用HttpResponse

t5fffqht  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(173)
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri}
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import spray.json._

import java.util.UUID
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.{Failure, Success}

object SoftwareRegistry extends App with Formatter {

  implicit val system = ActorSystem("NPMRegistry")
  implicit val materializer = ActorMaterializer()

  case class NPMPackage(name: String)

  // reading the packages
  val filename = "B:\\Scala\\NPMRegistry\\src\\main\\resources\\packages.txt"
  val bufferedSource = scala.io.Source.fromFile(filename)
  val listOfPackages: List[NPMPackage] = (for (line <- bufferedSource.getLines) yield {
    NPMPackage(line.trim)
  }).toList
  bufferedSource.close()

  // requests
  val serverHttpRequests = listOfPackages.map(pkg =>
    (HttpRequest(
      HttpMethods.GET,
      uri = Uri(s"/${pkg.name}")
    ),
      UUID.randomUUID().toString)
  )

  // source
  val sourceList = Source(serverHttpRequests)
  val bufferedFlow = Flow[(HttpRequest, String)]
    .buffer(10, overflowStrategy = OverflowStrategy.backpressure)
    .throttle(1, 3 seconds)

  val dd = sourceList
    .via(bufferedFlow).async
    .via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
    .runForeach {
      case (Success(response), oId) =>
        println(s"$oId $response")
      case (Failure(ex), oId) => println(ex)
    }

在上面的代码中,我可以将响应打印到控制台,并且我想知道如何使用实体并以流的方式访问响应中的数据,而不是在将来。
下面是现有代码

的结果

s4n0splo

s4n0splo1#

基本上,您需要将逻辑保留在Akka Streams API中,而不是像您那样使用runForEach终止它。
这种情况的简化示例如下所示:

.via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
    .flatMapConcat {
      case (Success(response), _) => Source.single(response)
      case (Failure(_), _) => Source.empty //warning, ignoring errors
    }
    .map(httpResponse => httpResponse.entity)
    .flatMapConcat(e => e.getDataBytes().map(bytes => ???))
    .runWith(Sink.ignore)

为了简化示例,我使用flatMapConcat来获取HttpRespnse,忽略错误和请求的上下文。然后map ping获取HttpEntity,然后flatMapConcat再次执行以获取表示响应主体的ByteString。每个HttpRequest可能会有多个响应主体,我猜你指的是“流媒体”。

相关问题