使用Akka 2.4.7。我想记录整个Http响应。使用类似于How does one log Akka HTTP client requests的实现。需要关注的代码是从HttpEntity中提取数据的代码
def entityAsString(entity: HttpEntity) (implicit m: Materializer, ex: ExecutionContext): Future[String] = {
entity.dataBytes.map(_.decodeString("UTF-8")).runWith(Sink.head)
}
如果POST请求的有效负载很小,这种方法就很有效。但是从1K开始,就有一个例外:
java.lang.IllegalStateException: Substream Source cannot be materialized more than once
问题:为什么这个异常依赖于POST有效负载的大小。希望有什么可能的修复方法吗?
完整日志消息:
2016-08-11 10:15:35,100 ERROR a.a.ActorSystemImpl [undefined]: Error during processing of request HttpRequest(HttpMethod(POST),http://localhost:3001/api/v2/exec,List(User-Agent: curl/7.30.0, Host: localhost:3001, Accept: */*, Expect: 100-continue, Timeout-Access: <function1>),HttpEntity.Default(multipart/form-data; boundary=-------------------------acebdf13572468; charset=UTF-8,5599,Source(SourceShape(StreamUtils$$anon$2.out), CompositeModule [2db5bfef]
Name: unnamed
Modules:
(unnamed) CompositeModule [4aac8b90]
Name: unnamed
Modules:
(SubSource%28EntitySource%29) GraphStage(EntitySource) [073d36ba]
(unnamed) [155dd7c9] copy of GraphStage(OneHundredContinueStage) [40b6c892]
(unnamed) [1b902132] copy of GraphStage(Collect) [75f65c1c]
(limitable) [76375468] copy of CompositeModule [59626a09]
Name: limitable
Modules:
(unnamed) GraphStage(unknown-operation) [1bee846d]
Downstreams:
Upstreams:
MatValue: Ignore
Downstreams:
SubSource.out -> GraphStage.in
GraphStage.out -> Collect.in
Collect.out -> unknown-operation.in
Upstreams:
GraphStage.in -> SubSource.out
Collect.in -> GraphStage.out
unknown-operation.in -> Collect.out
MatValue: Atomic(SubSource%28EntitySource%29[073d36ba])
(unnamed) [77d6c04c] copy of GraphStage(akka.http.impl.util.StreamUtils$$anon$2@30858cb0) [7e073049]
Downstreams:
SubSource.out -> GraphStage.in
GraphStage.out -> Collect.in
Collect.out -> unknown-operation.in
unknown-operation.out -> StreamUtils$$anon$2.in
Upstreams:
GraphStage.in -> SubSource.out
Collect.in -> GraphStage.out
unknown-operation.in -> Collect.out
StreamUtils$$anon$2.in -> unknown-operation.out
MatValue: Atomic(akka.stream.impl.StreamLayout$CompositeModule[4aac8b90]))),HttpProtocol(HTTP/1.1))
java.lang.IllegalStateException: Substream Source cannot be materialized more than once
at akka.stream.impl.fusing.SubSource$$anon$4.setCB(StreamOfStreams.scala:703)
at akka.stream.impl.fusing.SubSource$$anon$4.preStart(StreamOfStreams.scala:713)
at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:475)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:380)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:538)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:586)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:489)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.create(ActorCell.scala:590)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2条答案
按热度按时间erhoui1w1#
我假设在调用
entityAsString
之前,entity.dataBytes
已经被用于某个有用的目的,或者entityAsString
被调用了两次。一般情况下,HttpEntity
的内容不能被重用。但是,HttpEntity.Strict
的内容可以被重用。rseugnpd2#
我发现这些问题仍然与Akka-http 2.6.4相关,在查看了一些bug报告后,这篇文章特别帮助我找到了https://github.com/akka/akka-http/issues/73的解决方案。
但是,在上面的参考文献中提到,这意味着文件的内容存储在内存中,而不是使用流。因此,我认为这是一种变通方法,而不是解决方案。
另外请注意,我没有发现
fileUpload
和storeUploadedFile
之间的行为有任何不同。解决方法:以下是我的函数示例
使用
storeUploadedFile
:请注意,Unmarshaller是以下列方式隐含提供的: