使用Micronaut WebSocket端点从未绑定Flux发送更新

vxf3dgd4  于 2023-02-19  发布在  其他
关注(0)|答案(1)|浏览(122)

我有一个通量,它在一个时间间隔内发出无限数量的值。我需要一个Websockets端点,它在客户端连接时发出通量的值。
目前我的体会如下:

@ServerWebSocket("/updates")
public class UpdateController {
    private Flux<Update> updates;
    
    // ... left out for brevity

    @OnOpen
    public Flux<Update> onOpen(WebSocketSession session) {
        return updates.flatMap(session::send);
    }

    @OnMessage
    public void onMessage(String content) {
        // do nothing
    }

    @OnClose
    public void onClose(WebSocketSession session) {
        // do nothing
    }
}

这是可行的,但是一旦客户端关闭连接,就会抛出一个异常。这对我来说是有意义的,因为updates Flux仍然会发出值,并且会调用session::send。
但是我怎样才能在某种程度上构造我的代码,使这个异常不被抛出呢?我觉得我遗漏了一些东西。

yqlxgs2m

yqlxgs2m1#

您可以通过添加一个过滤器来检查会话是否在flatMap调用之前打开,从而防止向已关闭的会话发出值:

@OnOpen
public Flux<String> onOpen(WebSocketSession session) {
    return updates
            .filter(it -> session.isOpen())
            .flatMap(session::send);
}

相关问题