如果实体大小>1 K,则Akka-Stream、日志记录、实体化流失败

lbsnaicq  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(159)

使用Akka 2.4.7。我想记录整个Http响应。使用类似于How does one log Akka HTTP client requests的实现。需要关注的代码是从HttpEntity中提取数据的代码

def entityAsString(entity: HttpEntity) (implicit m: Materializer, ex: ExecutionContext): Future[String] = {
    entity.dataBytes.map(_.decodeString("UTF-8")).runWith(Sink.head)
}

如果POST请求的有效负载很小,这种方法就很有效。但是从1K开始,就有一个例外:

java.lang.IllegalStateException: Substream Source cannot be materialized more than once

问题:为什么这个异常依赖于POST有效负载的大小。希望有什么可能的修复方法吗?

完整日志消息:

2016-08-11 10:15:35,100 ERROR a.a.ActorSystemImpl [undefined]: Error during processing of request HttpRequest(HttpMethod(POST),http://localhost:3001/api/v2/exec,List(User-Agent: curl/7.30.0, Host: localhost:3001, Accept: */*, Expect: 100-continue, Timeout-Access: <function1>),HttpEntity.Default(multipart/form-data; boundary=-------------------------acebdf13572468; charset=UTF-8,5599,Source(SourceShape(StreamUtils$$anon$2.out), CompositeModule [2db5bfef]  
  Name: unnamed  
  Modules:  
    (unnamed) CompositeModule [4aac8b90]  
      Name: unnamed  
      Modules:  
        (SubSource%28EntitySource%29) GraphStage(EntitySource) [073d36ba]  
        (unnamed) [155dd7c9] copy of GraphStage(OneHundredContinueStage) [40b6c892]  
        (unnamed) [1b902132] copy of GraphStage(Collect) [75f65c1c]  
        (limitable) [76375468] copy of CompositeModule [59626a09]  
          Name: limitable  
          Modules:  
            (unnamed) GraphStage(unknown-operation) [1bee846d]  
          Downstreams:   
          Upstreams:   
          MatValue: Ignore  
      Downstreams:   
        SubSource.out -> GraphStage.in  
        GraphStage.out -> Collect.in  
        Collect.out -> unknown-operation.in  
      Upstreams:   
        GraphStage.in -> SubSource.out  
        Collect.in -> GraphStage.out  
        unknown-operation.in -> Collect.out  
      MatValue: Atomic(SubSource%28EntitySource%29[073d36ba])  
    (unnamed) [77d6c04c] copy of GraphStage(akka.http.impl.util.StreamUtils$$anon$2@30858cb0) [7e073049]  
  Downstreams:   
    SubSource.out -> GraphStage.in  
    GraphStage.out -> Collect.in  
    Collect.out -> unknown-operation.in  
    unknown-operation.out -> StreamUtils$$anon$2.in  
  Upstreams:   
    GraphStage.in -> SubSource.out  
    Collect.in -> GraphStage.out  
    unknown-operation.in -> Collect.out  
    StreamUtils$$anon$2.in -> unknown-operation.out  
  MatValue: Atomic(akka.stream.impl.StreamLayout$CompositeModule[4aac8b90]))),HttpProtocol(HTTP/1.1))  
java.lang.IllegalStateException: Substream Source cannot be materialized more than once  
    at akka.stream.impl.fusing.SubSource$$anon$4.setCB(StreamOfStreams.scala:703)  
    at akka.stream.impl.fusing.SubSource$$anon$4.preStart(StreamOfStreams.scala:713)  
    at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:475)  
    at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:380)  
    at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:538)  
    at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:586)  
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:489)  
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:529)  
    at akka.actor.ActorCell.create(ActorCell.scala:590)  
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)  
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)  
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)  
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)  
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)  
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)  
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)  
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)  
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
erhoui1w

erhoui1w1#

我假设在调用entityAsString之前,entity.dataBytes已经被用于某个有用的目的,或者entityAsString被调用了两次。一般情况下,HttpEntity的内容不能被重用。但是,HttpEntity.Strict的内容可以被重用。

rseugnpd

rseugnpd2#

我发现这些问题仍然与Akka-http 2.6.4相关,在查看了一些bug报告后,这篇文章特别帮助我找到了https://github.com/akka/akka-http/issues/73的解决方案。
但是,在上面的参考文献中提到,这意味着文件的内容存储在内存中,而不是使用流。因此,我认为这是一种变通方法,而不是解决方案。
另外请注意,我没有发现fileUploadstoreUploadedFile之间的行为有任何不同。
解决方法:以下是我的函数示例

def createTestUploadWithStrict = toStrictEntity(3.seconds) {
    (withoutSizeLimit & 
     post & 
     pathPrefix("test") & 
     fileUpload("data") & 
     formField("f1".as[MyCustomFormFiled])){ 
         case ((metadata: FileInfo, fileStream: Source[ByteString, Any]), di:MyCustomFormFiled) => {

    // Save the file
    val file = tempDestination(metadata)
    val sink = FileIO.toPath(file.toPath)
    val writeResultFut = fileStream.runWith(FileIO.toPath(file.toPath))
    val result = ???

    // file is written to file
    onComplete(writeResultFut) {
        case Success(_) =>
          complete(200 -> s"Working fine with data $result")
        case Failure(e) =>
          complete(500 -> s"Error while writing data file: $e")
      }
    }
  }
}

使用storeUploadedFile

def createTestUploadWithStrict = toStrictEntity(3.seconds) {
    (withoutSizeLimit & 
     post & 
     pathPrefix("test2") & 
     storeUploadedFile("data", tempDestination) & 
     formField("device".as[MyCustomFormFiled])){ 
(metadata: FileInfo, file: File, di:MyCustomFormFiled) =>
    val result = ???
    complete(200 -> s"Working fine with data $result")
    }
  }

请注意,Unmarshaller是以下列方式隐含提供的:

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport                                                                                                                      
import spray.json._  

trait JsonSupport extends SprayJsonSupport with  DefaultJsonProtocol{
    implicit val mycustomFormFieldFormat = jsonFormat2(MyCustomFormFiled)                                                                                                                        }

相关问题