akka问题子流源(EntitySource)不能多次实体化

doinxwow  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(140)

当我尝试对响应进行两次解组时,我遇到了这个错误,这是因为我需要响应作为case类,我还需要它作为json-string,因为对于对象,我在第一个流中使用它,而在另一个第二个流中使用json-string,所以这就是我需要两者的原因。
错误日志:

21:08:16.174 [Extractor-akka.actor.default-dispatcher-14] DEBUG akka.actor.RepointableActorRef - Aborting tcp connection to api.clickup.com:443 because of upstream failure: java.lang.IllegalStateException: Substream Source(EntitySource) cannot be materialized more than once
akka.stream.impl.fusing.SubSource$$anon$11.createMaterializedTwiceException(StreamOfStreams.scala:833)
akka.stream.impl.fusing.SubSource$$anon$11.<init>(StreamOfStreams.scala:804)

密码:

Http().singleRequest(HttpRequest( method = GET ,uri = s"https://api.xxxxxx.com/task?archived=false&page=${pag}&subtasks=true&include_closed=true" )
        .withHeaders(RawHeader( CLICKUP_AUTH, config.token )))
      .flatMap {
        res =>
          val bodyString:Future[String] = Unmarshal(res).to[String]
          val tasks: Future[TasksItem] = Unmarshal(res).to[TasksItem]

          (tasks zip( bodyString)).map{
            case (a,b) =>
              TaskUnmarshall( a, b)
          }
      }

我怎么能两次解组一个来自akka流的响应并得到两个呢?

fae0ux8s

fae0ux8s1#

问题在于,主体是通过线路的实际字节流,所以这就是为什么你不能使用它们两次,它们只到达一次。
如果您的用例可以将整个主体拉到内存中,那么您可以首先将响应主体设置为strict(将其加载到内存中),然后将其解编集任意次:

Http().singleRequest(HttpRequest(uri = "https://somewhere.com/"))
  .flatMap { response =>
    response.entity.toStrict(5.seconds).map { allBodyInMemory =>
      response.withEntity(allBodyInMemory)
    }
  }.flatMap { inMemoryResponse =>
    val json = Unmarshal(inMemoryResponse).to[String]
    val other = Unmarshal(inMemoryResponse).to[TasksItem]
    json.zip(other)
  }

不确定是否可以使用Akka HTTP Unmarshalling API使用两个下游流,但一般来说,使用someSource.alsoTo(downstream1).runWith(downstream2)之类的Akka Streams是可能的

相关问题