当我尝试对响应进行两次解组时,我遇到了这个错误,这是因为我需要响应作为case类,我还需要它作为json-string,因为对于对象,我在第一个流中使用它,而在另一个第二个流中使用json-string,所以这就是我需要两者的原因。
错误日志:
21:08:16.174 [Extractor-akka.actor.default-dispatcher-14] DEBUG akka.actor.RepointableActorRef - Aborting tcp connection to api.clickup.com:443 because of upstream failure: java.lang.IllegalStateException: Substream Source(EntitySource) cannot be materialized more than once
akka.stream.impl.fusing.SubSource$$anon$11.createMaterializedTwiceException(StreamOfStreams.scala:833)
akka.stream.impl.fusing.SubSource$$anon$11.<init>(StreamOfStreams.scala:804)
密码:
Http().singleRequest(HttpRequest( method = GET ,uri = s"https://api.xxxxxx.com/task?archived=false&page=${pag}&subtasks=true&include_closed=true" )
.withHeaders(RawHeader( CLICKUP_AUTH, config.token )))
.flatMap {
res =>
val bodyString:Future[String] = Unmarshal(res).to[String]
val tasks: Future[TasksItem] = Unmarshal(res).to[TasksItem]
(tasks zip( bodyString)).map{
case (a,b) =>
TaskUnmarshall( a, b)
}
}
我怎么能两次解组一个来自akka流的响应并得到两个呢?
1条答案
按热度按时间fae0ux8s1#
问题在于,主体是通过线路的实际字节流,所以这就是为什么你不能使用它们两次,它们只到达一次。
如果您的用例可以将整个主体拉到内存中,那么您可以首先将响应主体设置为strict(将其加载到内存中),然后将其解编集任意次:
不确定是否可以使用Akka HTTP Unmarshalling API使用两个下游流,但一般来说,使用
someSource.alsoTo(downstream1).runWith(downstream2)
之类的Akka Streams是可能的