akka 基于URL或本地文件限制流

8zzbczxx  于 9个月前  发布在  其他
关注(0)|答案(1)|浏览(150)

我有一个akka http API,用户将S3 URL发送到服务器。服务器然后从AWS服务器启动流,并对源执行未来的操作。然而,我想在执行任何操作之前验证传入stream的大小。但我们不能使用withSizeLimitakka-http指令,所以我为此创建了一个自定义实现。

final case class SizeLimit(maxBytes: Long, contentLength: Option[Long] = None) extends Attributes.Attribute {
  def isDisabled = maxBytes < 0
}

object Limitable {
  def applyForByteStrings[Mat](source: Source[ByteString, Mat], limit: SizeLimit): Source[ByteString, Mat] =
    applyLimit(source, limit)(_.size)

  def applyForChunks[Mat](source: Source[ChunkStreamPart, Mat], limit: SizeLimit): Source[ChunkStreamPart, Mat] =
    applyLimit(source, limit)(_.data.size)

  def applyLimit[T, Mat](source: Source[T, Mat], limit: SizeLimit)(sizeOf: T => Int): Source[T, Mat] =
    if (limit.isDisabled) source withAttributes Attributes(limit) // no need to add stage, it's either there or not needed
    else source.via(new Limitable(sizeOf)) withAttributes Attributes(limit)

  private val limitableDefaults = Attributes.name("limitable")
}

final class Limitable[T](sizeOf: T => Int) extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("Limitable.in")
  val out = Outlet[T]("Limitable.out")
  var numPullCalls = 0
  var numPushCalls = 0
  override val shape = FlowShape.of(in, out)
  override protected val initialAttributes: Attributes = Limitable.limitableDefaults

  override def createLogic(_attributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
    private var maxBytes = -1L
    private var bytesLeft = Long.MaxValue

    @nowarn("msg=deprecated") // we need getFirst semantics
    override def preStart(): Unit = {
      _attributes.getFirst[SizeLimit] match {
        case Some(limit: SizeLimit) if limit.isDisabled =>
        // "no limit"
        case Some(SizeLimit(bytes, cl @ Some(contentLength))) =>
          if (contentLength > bytes) failStage(EntityStreamSizeException(bytes, cl))
        // else we still count but never throw an error
        case Some(SizeLimit(bytes, None)) =>
          maxBytes = bytes
          bytesLeft = bytes
        case None =>
      }
    }

    override def onPush(): Unit = {
      numPushCalls += 1
      println(s"Push calls $numPushCalls")
      val elem = grab(in)
      val temp = sizeOf(elem)
      println(s"Elem size is $temp")
      bytesLeft -= temp
      if (bytesLeft >= 0) {
        push(out, elem)
      }
      else {
        println(s"EntityStreamSizeException Bytes left $bytesLeft")
        failStage(EntityStreamSizeException(maxBytes))
      }
    }

    override def onPull(): Unit = {
      numPullCalls += 1
      println(s"Pull calls $numPullCalls")
      pull(in)
    }

    setHandlers(in, out, this)
  }
}

val filePath = Paths.get("/Users/<username>/Documents/bigfile.pdf")
val fileSource: Source[ByteString, Any] = FileIO.fromPath(filePath)
val res = Limitable.applyForByteStrings(fileSource, SizeLimit(4000000L))
val sink   = StreamConverters.asInputStream()
val result = res.runWith(sink)
val tis = TikaInputStream.get(result)

字符串
这是自定义实现参考:https://github.com/akka/akka-http/blob/main/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala
自定义实现后,如果文件大小超过4 Mb,我希望服务器抛出EntityStreamSizeException,但它不会抛出任何异常。

cl25kdpy

cl25kdpy1#

该文件说

Demand flowing upstream leading to elements flowing downstream.

字符串
我没有在任何地方消费输出流,因此仅限于处理16个元素,这是默认的Sink大小。

val fileSource: Source[ByteString, Any] = FileIO.fromPath(filePath)
val res = Limitable.applyForByteStrings(fileSource, SizeLimit(4000000L))
res.runForeach(println) //This makes sink utilise the stream and further pull from GraphStage

相关问题