当关联对象“过时”时清理Redison发布/子侦听器

mu0hgdu0  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(327)

我试图用java实现一个简单的websocket应用程序,通过使用redis和redison库,它能够水平扩展。
websocket服务器基本上跟踪连接的客户机,并将接收到的消息发送给rtopic—这非常有效。
为了使用,我编写了一段代码,在注册客户机时添加一个侦听器:它通过以下方式将客户机对象与侦听器关联:

private static RedissonClient redisson = RedissonRedisServer.createRedisConnectionWithConfig();
public static final RTopic subcriberTopic = redisson.getTopic("clientsMapTopic");

public static boolean sendToPubSub(ConnectedClient q, String message) {
        boolean[] success = {true};
        MessageListener<Message> listener = new MessageListener<Message>() {
            @Override
            public void onMessage(CharSequence channel, Message message) {
                logger.debug("The message is : " + message.getMediaId());

                try {
                    logger.debug("ConnectedClient mediaid: " + q.getMediaid() + ",Message mediaid " + message.getMediaId());
                    if (q.getMediaid().equals(message.getMediaId())) {
                        // we need to verify if the message goes to the right receiver
                        logger.debug("MESSAGE from PUBSUB to (" + q.getId() + ") @ " + q.getSession().getId() + " " + message);
                        // this is the actual message to the websocket client
                        // this executes on the wrong connected client when the connection is closed and reopened
                        q.getSession().getBasicRemote().sendText(message.getMessage());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    success[0] = false;
                }
            }
        };
        int listenerId = subcriberTopic.addListener(Message.class, listener);
}

我观察到的问题如下:
来自与该对象关联的客户机注册侦听器的初始连接
发送到ws-server的消息被侦听器接收并正确发送
断开websocket-创建新连接-创建新侦听器
发送到ws-server的消息由相同的原始侦听器接收,并使用该连接的客户机而不是新注册的客户机
发送失败(因为客户端和ws-connection不存在)并且无法进一步处理
如果客户机被删除,我似乎只需要删除客户机的侦听器,但是我还没有找到一个好的方法来这样做,因为尽管我在调试器中看到侦听器具有关联的已连接客户机对象,但是如果不为此添加代码,我就无法检索它们。
我是否正确地观察到了这一点?什么是使其正常工作的好方法?

vsnjm48y

vsnjm48y1#

当我写这个问题的时候,我有点倾向于一个我已经想好并尝试过的答案,这很有效。我添加了一个concurrenthashmap来跟踪连接的客户机和侦听器之间的关系。在我处理指向客户端删除的websocket错误的逻辑中,我删除了关联的侦听器(以及Map中的条目)。现在它可以正常工作了。
小片段:

int listenerId = subcriberTopic.addListener(Message.class, listener);
clientListeners.put(q,(Integer)listenerId);

然后在触发清理的websocket onerror处理程序中:

// remove the associated listener
int listenerIdForClient = MessageContainer.clientListeners.get(cP);
MessageContainer.subcriberTopic.removeListener((Integer) listenerIdForClient);
// remove entry from map
MessageContainer.clientListeners.remove(cP);

现在监听器得到了正确的清理,下次创建新的监听器并处理消息时。

相关问题