java akka 溪流:提供给QueueSource时发生StreamDetachedException

w8f9ii69  于 2023-04-28  发布在  Java
关注(0)|答案(1)|浏览(95)

我们有一个DownloadFileFlow类,它使用Akka StreamsAkka Http从不同的域下载文件;有时(约50%)在提供给QueueSource时,我们会立即遇到StreamDetachedException错误:

java.util.concurrent.CompletionException: akka.stream.StreamDetachedException: Stage with GraphStageLogic akka.stream.impl.QueueSource$$anon$1-queueSource stopped before async invocation was processed
   at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
   at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:747)
   at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735)
   at java.base/java.util.concurrent.CompletableFuture.thenAcceptAsync(CompletableFuture.java:2186)
   at scala.concurrent.java8.FuturesConvertersImpl$CF.thenAccept(FutureConvertersImpl.scala:29)
   at scala.concurrent.java8.FuturesConvertersImpl$CF.thenAccept(FutureConvertersImpl.scala:18)
   at x.y.z.DownloadFileFlow.offer(DownloadFileFlow.java:60)

我们不知道错误的原因是什么,但akka.stream.impl.QueueSource中有一个postStop方法,我们对此表示怀疑,但不知道stop的原因:

override def postStop(): Unit = {
  val exception = new StreamDetachedException()
  completion.tryFailure(exception)
}

这个问题的原因可能是我们在某个地方有资源泄漏吗?例如,在某些情况下不使用http-response-entity?

  1. SourceQueue引用:
AtomicReference<SourceQueueWithComplete<FileDownloadEnvelope>> queueRef = new AtomicReference<>()

1.定义:

RestartSource.onFailuresWithBackoff(
     RestartSettings.create(props.getRecoverMinBackOff(), props.getRecoverMaxBackOff(), RANDOM_FACTOR),
        () -> Source.<FileDownloadEnvelope>queue(props.getBufferSize(), OverflowStrategy.backpressure(), props.getMaxConcurrentOffers())
            .filter(this::checkExpiration)
            .map(x -> Pair.create(createContext(x), x))
            .flatMapConcat(this::authIfNeeded)
            .mapAsyncUnordered(props.getParallelism(), this::process)
            .map(this::reply)
            .mapMaterializedValue(x -> {
                this.queueRef.set(x);
                return x;
            }))
    .runWith(Sink.ignore(), actorSystem);

1.访问(offer方法):

public void offer(FileDownloadEnvelope envelope) {
    queueRef.get().offer(envelope)
        .thenAccept(x -> {
            if (!x.isEnqueued())
                replyError(envelope);
        }).exceptionally(e -> { // StreamDetachedException catched here
            replyError(envelope);
            return null;
        });
}

Akka Http prop :

akka.http.host-connection-pool.min-connections: "10"
akka.http.host-connection-pool.max-connections: "2000"
akka.http.host-connection-pool.max-open-requests: "4096"
akka.http.host-connection-pool.max-retries: "0"
akka.http.host-connection-pool.client.connecting-timeout: 2s
akka.http.host-connection-pool.client.idle-timeout: 5s

流 prop :

BUFFER-SIZE: "2000"
MAX-CONCURRENT-OFFERS: "2000"
PARALLELISM: "70"
RECOVER-MIN-BACK-OFF: 100ms
RECOVER-MAX-BACK-OFF: 500ms

我们使用akka & akka-stream: 2.6.19akka-http: 10.2.0

u5rb5r59

u5rb5r591#

错误的原因是在处理process方法中的特定错误时出现问题。在这种情况下抛出了异常,显然,抛出异常导致从队列中删除所有现有元素,原因为“StreamDetachedException”。

.exceptionally(e -> {
    // ...
    else if (e.getCause() instanceof FooException) {
        throw new RuntimeException(e); // cause
    }
    // ....
});

返回错误对象而不是抛出异常,解决了问题。

相关问题