如何解决Spring云网关内存泄漏

qltillow  于 12个月前  发布在  Spring
关注(0)|答案(1)|浏览(138)

我在我的服务中使用Spring Cloud Gateway,并在我的LoggingFilter中使用下面的RequestDecorator作为 Package 器。

public class RequestDecorator extends ServerHttpRequestDecorator {

  private final List<DataBuffer> dataBuffers = new ArrayList<>();

  public RequestDecorator(ServerHttpRequest delegate) {
    super(delegate);
    super.getBody()
        .map(
            dataBuffer -> {
              dataBuffers.add(dataBuffer);
              return dataBuffer;
            })
        .subscribe();
  }

  @Override
  public Flux<DataBuffer> getBody() {
    return copy();
  }

  private Flux<DataBuffer> copy() {
    return Flux.fromIterable(dataBuffers)
        .map(dataBuffer -> dataBuffer.factory().wrap(dataBuffer.asByteBuffer()));
  }
}

字符串
当Jmeter使用该服务进行性能测试时,我在日志中发现了以下内存泄漏错误。

i.n.u.ResourceLeakDetector               :   - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
    io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
    io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)
    io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:785)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Thread.java:834)


在网上查看了一些内容后,我发现了下面的评论-“如果你使用DataBuffer,你可能会得到同样的错误。Spring有DataBufferUtils库来释放资源。”
DataBufferUtils.release(data Buffer);
但是我想知道我如何在我的装饰器类中使用它,因为我在我的LoggingFilter中使用了这个 Package 器。
有人能给点建议吗?

mzsu5hc0

mzsu5hc01#

你好兄弟这里是我的loggingFilter。我只记录请求对象,因为在我的项目中,响应对象可以是4 5 mb,我不想在日志中看到它。在你的情况下,你应该释放databuffer与这样的东西。doOnDiscard(PooledDataBuffer.class,DataBufferUtils::release);

@Slf4j
@Component
public class LoggingFilter implements GlobalFilter, Ordered {

  @Autowired private Tracer tracer;
  private static final Set<String> LOGGABLE_CONTENT_TYPES =
      new HashSet<>(
          Arrays.asList(
              MediaType.APPLICATION_JSON_VALUE.toLowerCase(),
              MediaType.APPLICATION_JSON_UTF8_VALUE.toLowerCase(),
              MediaType.TEXT_PLAIN_VALUE,
              MediaType.TEXT_XML_VALUE));

  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    String traceId =
        tracer.currentSpan() != null
            ? tracer.currentSpan().context().traceIdString()
            : tracer.nextSpan().context().traceIdString();
    ServerHttpRequest mutatedServerHttpRequest =
        exchange.getRequest().mutate().header("x-b3-traceid", traceId).build();
    var requestMutated =
        new ServerHttpRequestDecorator(mutatedServerHttpRequest) {
          @Override
          public Flux<DataBuffer> getBody() {
            var requestLogger = new Logger(getDelegate());
            if (LOGGABLE_CONTENT_TYPES.contains(
                String.valueOf(getHeaders().getContentType()).toLowerCase())) {
              return super.getBody()
                  .map(
                      ds -> {
                        requestLogger.appendBody(ds.asByteBuffer());
                        return ds;
                      })
                  .doFinally((s) -> requestLogger.log())
                      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
            } else {
              requestLogger.log();
              return super.getBody().doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
            }
          }
        };
    var responseMutated =
        new ServerHttpResponseDecorator(exchange.getResponse()) {
          @Override
          public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
            //var responseLogger = new Logger(getDelegate());
            if (LOGGABLE_CONTENT_TYPES.contains(
                String.valueOf(getHeaders().getContentType()).toLowerCase())) {
              return join(body)
                  .flatMap(
                      db -> {
                        //responseLogger.appendBody(db.asByteBuffer());
                        //responseLogger.log();
                        return getDelegate().writeWith(Mono.just(db));
                      }).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
            } else {
              //responseLogger.log();
              return getDelegate().writeWith(body).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
            }
          }
        };

    return chain.filter(
        exchange.mutate().request(requestMutated).response(responseMutated).build());
  }

  private Mono<? extends DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
    Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
    return Flux.from(dataBuffers)
        .collectList()
        .filter((list) -> !list.isEmpty())
        .map((list) -> list.get(0).factory().join(list))
        .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  }

  @Override
  public int getOrder() {
    return Ordered.HIGHEST_PRECEDENCE;
  }

  @ToString
  private class Logger {
    private Map<String, String> headers;
    private HttpStatus status;
    private String path;
    private String body;

    Logger(ServerHttpResponse response) {
      headers = response.getHeaders().toSingleValueMap();
      status = HttpStatus.valueOf(response.getStatusCode().value());
    }

    Logger(ServerHttpRequest request) {
      if (tracer.currentSpan() == null || tracer.currentSpan().context() == null) {
        MDC.put("traceId", request.getHeaders().getFirst("x-b3-traceid"));
      } else {
        MDC.put("traceId", tracer.currentSpan().context().traceIdString());
      }
      headers = request.getHeaders().toSingleValueMap();
      path = request.getMethod() + " " + request.getPath();
    }

    void appendBody(ByteBuffer byteBuffer) {
      body = StandardCharsets.UTF_8.decode(byteBuffer).toString();
    }

    void log() {
      log.info(this.toString());
    }
  }
}

字符串

相关问题