节点redis xread阻塞订阅

mzmfm0qo  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(673)

我正在将redis发布/子系统转换为redis流,以便可以为服务器发送的事件添加一些容错性。
传统的订阅方式很简单:

import { createClient } from 'redis';

const redisOptions = {
  url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);

redis.on("message", (channel, message) => {
  console.log(channel);
  console.log(message);
});

redis.subscribe('foo');

这会永久阻塞,并保持连接打开。在这种情况下,发布到redis将添加到您的日志中。

const json = { a: 1, b: 2 };

redis.publish('foo', JSON.stringify(json));

切换到流,使用 XREAD 而不是订阅 XADD 而不是发布,数据就大不相同了。我正在挣扎的部分是阻塞。

redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
  if (err) return console.error('Error reading from stream:', err);

  str.forEach(message => {
    console.log(message);
  });
}

发送消息时,我的“订阅”会接收第一条消息,但不会记录更多消息。

6xfqseft

6xfqseft1#

说实话,我之所以问这个问题,是因为谷歌对我没有好处,而且我找不到其他人发布关于这个问题的帖子。希望这有帮助!
所以, XREAD 只在第一次呼叫时阻塞。它将在一个设定的时间段内等待(如果您将时间设置为0,则无限期等待),但一旦它接收到数据,它的职责就被认为已完成,并且它将取消阻止。为了保持“订阅”的活力,你需要打电话 XREAD 同样,使用流中最新的id。这将替换初始值 $ 我们传递了它。
递归似乎是一个完美的解决方案:

const xread = ({ stream, id }) => {
  redis.xread('BLOCK', 0, 'STREAMS', stream, id, (err, str) => {
    if (err) return console.error('Error reading from stream:', err);

    str[0][1].forEach(message => {
      id = message[0];
      console.log(id);
      console.log(message[1]);
    });

    xread({ stream, id })
  });
}

xread({ stream: 'asdf', id: '$' })

相关问题