import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri}
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import spray.json._
import java.util.UUID
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.{Failure, Success}
object SoftwareRegistry extends App with Formatter {
implicit val system = ActorSystem("NPMRegistry")
implicit val materializer = ActorMaterializer()
case class NPMPackage(name: String)
// reading the packages
val filename = "B:\\Scala\\NPMRegistry\\src\\main\\resources\\packages.txt"
val bufferedSource = scala.io.Source.fromFile(filename)
val listOfPackages: List[NPMPackage] = (for (line <- bufferedSource.getLines) yield {
NPMPackage(line.trim)
}).toList
bufferedSource.close()
// requests
val serverHttpRequests = listOfPackages.map(pkg =>
(HttpRequest(
HttpMethods.GET,
uri = Uri(s"/${pkg.name}")
),
UUID.randomUUID().toString)
)
// source
val sourceList = Source(serverHttpRequests)
val bufferedFlow = Flow[(HttpRequest, String)]
.buffer(10, overflowStrategy = OverflowStrategy.backpressure)
.throttle(1, 3 seconds)
val dd = sourceList
.via(bufferedFlow).async
.via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
.runForeach {
case (Success(response), oId) =>
println(s"$oId $response")
case (Failure(ex), oId) => println(ex)
}
在上面的代码中,我可以将响应打印到控制台,并且我想知道如何使用实体并以流的方式访问响应中的数据,而不是在将来。
下面是现有代码
的结果
1条答案
按热度按时间s4n0splo1#
基本上,您需要将逻辑保留在Akka Streams API中,而不是像您那样使用
runForEach
终止它。这种情况的简化示例如下所示:
为了简化示例,我使用
flatMapConcat
来获取HttpRespnse
,忽略错误和请求的上下文。然后map
ping获取HttpEntity
,然后flatMapConcat
再次执行以获取表示响应主体的ByteString
。每个HttpRequest
可能会有多个响应主体,我猜你指的是“流媒体”。