websocket 为什么我在向数据库发出4个请求后出现超时异常?

k4ymrczo  于 2023-06-06  发布在  其他
关注(0)|答案(4)|浏览(429)

我正在构建quarkus WebSocket测试应用程序,它与postgres的React式panache连接。代码:

@Slf4j
@RequiredArgsConstructor
@ServerEndpoint(value = "/api/websocket",
        decoders = ObjectDecoder.class,
        encoders = ObjectEncoder.class)
public class Websocket {

    private final SimpleService simpleService;

    @OnOpen
    public void onOpen(Session session) {
        log.info("ON OPEN");
        simpleService.getAll().subscribe().with(item -> sendItem(session, item));
    }

    private void sendItem(Session session, SimpleObject item) {
        session.getAsyncRemote().sendObject(item, result -> {
            if (result.getException() != null) {
                result.getException().printStackTrace();
            }
        });
    }

//and other methods...

}
@ApplicationScoped
@RequiredArgsConstructor
public class SimpleService {

    private final SimpleObjectRepository repository; //default repo 

    public Multi<SimpleObject> getAll() {
        return repository.streamAll();
    }
}
@Entity
@Getter
@Setter
public class SimpleObject {

    @Id
    @GeneratedValue(generator = "UUID")
    @GenericGenerator(
            name = "UUID",
            strategy = "org.hibernate.id.UUIDGenerator"
    )
    @Column(name = "id")
    private UUID id;

    @Column(name = "message")
    private String message;
}
quarkus.http.port=8070

quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=postgres
quarkus.datasource.password=docker
quarkus.datasource.reactive.url=postgresql://localhost:5432/reactive_test

quarkus.websocket.dispatch-to-worker=true
  • quarkus.WebSocket.dispatch-to-worker=true*

如果没有此设置,我会得到会话/实体管理器关闭异常(Blocking IO Thread in Quarkus Websocket
Postman连接到WebSocket 4次后,我得到超时异常:

__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022-02-01 13:33:02,159 INFO  [io.und.websockets] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class org.example.reactive_websocket_
test.Websocket for path /api/websocket

2022-02-01 13:33:02,191 INFO  [org.hib.rea.pro.imp.ReactiveIntegrator] (JPA Startup Thread: default-reactive) HR000001: Hibernate Reactive
2022-02-01 13:33:02,200 INFO  [io.quarkus] (Quarkus Main Thread) reactive_websocket_test 1.0-SNAPSHOT on JVM (powered by Quarkus 2.6.3.Final) started in
 0.600s. Listening on: http://localhost:8070
2022-02-01 13:33:02,202 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-02-01 13:33:02,203 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, hibernate-orm, hibernate-reactive, hibernate-reactive-panache
, reactive-pg-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, vertx, websockets, websockets-client]
2022-02-01 13:33:02,204 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 1.531s
2022-02-01 13:33:05,225 INFO  [org.exa.rea.Websocket] (executor-thread-0) ON OPEN
2022-02-01 13:33:10,962 INFO  [org.exa.rea.Websocket] (executor-thread-0) ON CLOSE
2022-02-01 13:33:12,948 INFO  [org.exa.rea.Websocket] (executor-thread-2) ON OPEN
2022-02-01 13:33:14,502 INFO  [org.exa.rea.Websocket] (executor-thread-2) ON CLOSE
2022-02-01 13:33:16,001 INFO  [org.exa.rea.Websocket] (executor-thread-2) ON OPEN
2022-02-01 13:33:17,720 INFO  [org.exa.rea.Websocket] (executor-thread-2) ON CLOSE
2022-02-01 13:33:19,312 INFO  [org.exa.rea.Websocket] (executor-thread-2) ON OPEN
2022-02-01 13:33:21,830 INFO  [org.exa.rea.Websocket] (executor-thread-2) ON CLOSE
2022-02-01 13:33:23,806 INFO  [org.exa.rea.Websocket] (executor-thread-2) ON OPEN
2022-02-01 13:33:53,809 ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-7) HR000057: Failed to execute statement [$1select simpleobje0_.id as id1_0_
, simpleobje0_.message as message2_0_ from SimpleObject simpleobje0_]: $2could not execute query: java.util.concurrent.CompletionException: io.vertx.cor
e.impl.NoStackTraceThrowable: Timeout
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
        at io.vertx.core.Future.lambda$toCompletionStage$2(Future.java:362)
        at io.vertx.core.impl.future.FutureImpl$3.onFailure(FutureImpl.java:153)
        at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)
        at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
        at io.vertx.core.impl.future.Mapping.onFailure(Mapping.java:45)
        at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)
        at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
        at io.vertx.core.impl.future.PromiseImpl.tryFail(PromiseImpl.java:23)
        at io.vertx.core.impl.future.PromiseImpl.onFailure(PromiseImpl.java:54)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:43)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
        at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.lambda$null$0(SqlConnectionPool.java:202)
        at io.vertx.core.net.impl.pool.SimpleConnectionPool$Cancel.run(SimpleConnectionPool.java:666)
        at io.vertx.core.net.impl.pool.CombinerExecutor.submit(CombinerExecutor.java:50)
        at io.vertx.core.net.impl.pool.SimpleConnectionPool.execute(SimpleConnectionPool.java:240)
        at io.vertx.core.net.impl.pool.SimpleConnectionPool.cancel(SimpleConnectionPool.java:629)
        at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.lambda$onEnqueue$1(SqlConnectionPool.java:199)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:893)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:860)
        at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
        at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:168)
        at io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:53)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:883)
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.vertx.core.impl.NoStackTraceThrowable: Timeout

java.lang.Exception: Missing onFailure/onError handler in the subscriber
        at io.smallrye.mutiny.subscription.Subscribers.lambda$static$0(Subscribers.java:18)
        at io.smallrye.mutiny.subscription.Subscribers$CallbackBasedSubscriber.onFailure(Subscribers.java:84)
        at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.handleTerminationIfDone(MultiFlatMapOp.java:511)
        at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.ifDoneOrCancelled(MultiFlatMapOp.java:486)
        at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drainLoop(MultiFlatMapOp.java:286)
        at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drain(MultiFlatMapOp.java:266)
        at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onFailure(MultiFlatMapOp.java:210)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onError(MultiSubscriber.java:73)
        at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onFailure(UniToMultiPublisher.java:92)
        at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:48)
        at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:48)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage$CompletionStageUniSubscription.forwardResult(UniCreateFromCompletionSt
age.java:58)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
        at io.vertx.core.Future.lambda$toCompletionStage$2(Future.java:362)
        at io.vertx.core.impl.future.FutureImpl$3.onFailure(FutureImpl.java:153)
        at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)
        at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
        at io.vertx.core.impl.future.Mapping.onFailure(Mapping.java:45)
        at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)
        at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
        at io.vertx.core.impl.future.PromiseImpl.tryFail(PromiseImpl.java:23)
        at io.vertx.core.impl.future.PromiseImpl.onFailure(PromiseImpl.java:54)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:43)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
        at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.lambda$null$0(SqlConnectionPool.java:202)
        at io.vertx.core.net.impl.pool.SimpleConnectionPool$Cancel.run(SimpleConnectionPool.java:666)
        at io.vertx.core.net.impl.pool.CombinerExecutor.submit(CombinerExecutor.java:50)
        at io.vertx.core.net.impl.pool.SimpleConnectionPool.execute(SimpleConnectionPool.java:240)
        at io.vertx.core.net.impl.pool.SimpleConnectionPool.cancel(SimpleConnectionPool.java:629)
        at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.lambda$onEnqueue$1(SqlConnectionPool.java:199)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:893)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:860)
        at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
        at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:168)
        at io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:53)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:883)
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.vertx.core.impl.NoStackTraceThrowable: Timeout

从阅读日志-异常可能与vertx pg客户端池(PgPool类)有关。在将其注入到main类并记录其大小之后:

2022-02-01 13:54:00,189 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 1.461s
2022-02-01 13:54:03,018 INFO  [org.exa.rea.Websocket] (executor-thread-0) ON OPEN
2022-02-01 13:54:03,027 INFO  [org.exa.rea.Websocket] (executor-thread-0) pool size 1
2022-02-01 13:54:05,782 INFO  [org.exa.rea.Websocket] (executor-thread-0) ON CLOSE
2022-02-01 13:54:06,601 INFO  [org.exa.rea.Websocket] (executor-thread-0) ON OPEN
2022-02-01 13:54:06,604 INFO  [org.exa.rea.Websocket] (executor-thread-0) pool size 2
2022-02-01 13:54:09,492 INFO  [org.exa.rea.Websocket] (executor-thread-0) ON CLOSE
2022-02-01 13:54:11,842 INFO  [org.exa.rea.Websocket] (executor-thread-0) ON OPEN
2022-02-01 13:54:11,845 INFO  [org.exa.rea.Websocket] (executor-thread-0) pool size 3
2022-02-01 13:54:16,368 INFO  [org.exa.rea.Websocket] (executor-thread-2) ON OPEN
2022-02-01 13:54:16,370 INFO  [org.exa.rea.Websocket] (executor-thread-2) pool size 4
2022-02-01 13:54:19,293 INFO  [org.exa.rea.Websocket] (executor-thread-2) ON CLOSE
2022-02-01 13:54:21,238 INFO  [org.exa.rea.Websocket] (executor-thread-2) ON OPEN
2022-02-01 13:54:21,242 INFO  [org.exa.rea.Websocket] (executor-thread-2) pool size 4
2022-02-01 13:54:51,244 ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-11) HR000057: Failed to execute statement [$1select simpleobje0_.id as id1_0
_, simpleobje0_.message as message2_0_ from SimpleObject simpleobje0_]: $2could not execute query: java.util.concurrent.CompletionException: io.vertx.co
re.impl.NoStackTraceThrowable: Timeout

显然,池连接并没有像预期的那样被重用。我做错了什么,如何解决这个问题?
P.S.复制https://github.com/hadouken900/quarkus-websocker-test

xjreopfe

xjreopfe1#

我在使用quarkus-hibernate-reactive-panache + quarkus-reactive-pg-client + quarkus-smallrye-graphql时遇到了同样的问题

@GraphQLApi
public class BookResource {

    @Query("allBooks")
    @Description("Get all existing books")
    public Uni<List<Book>> listAllBooks() {
        return Book.listAll();
    }
}

端点仅正常工作4次,然后出现上述超时异常。总是可复制的。

xriantvc

xriantvc2#

GraphQL问题已修复,将在2.10中提供。(参见https://github.com/quarkusio/quarkus/pull/25194)WebSocket我不确定

3duebb1j

3duebb1j3#

React式hibernate的“stream”方法需要事务才能工作(doc)。喜欢的东西

@OnOpen
    @ReactiveTransactional
    public Uni<Void> onOpen(Session session) {
        log.info("ON OPEN");
        return simpleService.getAll()
            .onOverflow().buffer(100) // consider buffering if next operation take time
            .onItem().transformToUniAndMerge(item -> {
                sendItem(session, item) // this method should return a Uni<*> as well
            }).onItem().ignoreAsUni()
    }

//不确定@OnOpen方法是否接受Uni作为返回类型。

eoxn13cs

eoxn13cs4#

https://github.com/quarkusio/quarkus/issues/23739中修复了类似的问题
...还没有尝试过,但是你能用@WithSession注解这个方法吗?我认为正在发生的事情是,会话永远不会关闭,连接永远不会释放。这就是为什么你在第五次尝试时出错。
解决方法是使用Hibernate Reactive而不使用Panache:

@Inject
Mutiny.SessionFactory sf;

@Funq("providers")
public Uni<List<ProviderEntity>> getProviders(){
    return sf.withSession(s -> s.createQuery( "from ProviderEntity", ProviderEntity.class).getResultList();
}

但我不熟悉Funqy,你可能应该为此创建一个单独的问题。

相关问题