在redis中,如何保证在多客户端环境中从列表中获取n个项目?

2exbekwf  于 2021-06-08  发布在  Redis
关注(0)|答案(2)|浏览(346)

假设redis中有一个键k,它持有一个值列表。
许多producer客户机正在使用lpush或rpush将元素逐个添加到此列表中。
另一方面,另一组客户机正在从列表中弹出元素,尽管有一定的限制。消费者只会尝试弹出n个项目,只有当列表包含至少n个项目时。这样可以确保消费者在完成弹出过程后手里会拿着n件物品
如果列表包含的项目数少于n个,那么消费者甚至不应该尝试从列表中弹出元素,因为他们在最后不会有至少n个项目。
如果只有一个客户机,那么客户机可以简单地运行 LLEN 命令检查列表是否至少包含n个项,并使用lpop/rpop减去n。
但是,如果有许多消费者客户机,则可能存在竞争条件,并且他们可以在读取后同时从列表中弹出项目 LLEN >=n。因此,我们可能最终处于这样一种状态,即每个消费者可能弹出少于n个元素,并且redis中的列表中没有剩余的项。
使用一个单独的锁定系统似乎是解决这个问题的一种方法,但是我很好奇这种类型的操作是否只能使用redis命令来完成,比如multi/exec/watch等等。
我检查了multi/exec方法,似乎它们不支持回滚。另外,multi/exec事务之间执行的所有命令都将返回'queued',因此我无法知道在事务中执行的n个lpop是否都将返回元素。

gcuhipw9

gcuhipw91#

所以你所需要的是一个原子的方式来检查列表长度和弹出条件。
这就是lua脚本的用途,请参见 EVAL 命令。
下面是一个lua脚本,让您开始:

local len = redis.call('LLEN', KEYS[1])
if len >= tonumber(ARGV[1]) then
  local res = {n=len}
  for i=1,len do
    res[i] = redis.call('LPOP', KEYS[1])
  end
  return res
else
  return false
end

用作

EVAL "local len = redis.call('LLEN', KEYS[1]) \n if len >= tonumber(ARGV[1]) then \n   local res = {n=len} \n   for i=1,len do \n     res[i] = redis.call('LPOP', KEYS[1]) \n   end \n   return res \n else \n   return false \n end" 1 list 3

这只会爆炸 ARGV[1] 如果列表中至少有那么多元素,则返回列表中的元素(键名后面的数字)。
lua脚本是以原子方式运行的,因此在读取客户机之间没有竞争条件。
正如op在评论中指出的那样,有数据丢失的风险,比如说因为 LPOP 然后脚本返回。你可以用 RPOPLPUSH 而不是 LPOP ,将元素存储在临时列表中。然后还需要一些跟踪、删除和恢复逻辑。注意你的客户也可能会死,留下一些未处理的元素。
您可能想看看redis streams。这种数据结构非常适合在多个客户机之间分配负载。当与消费者组一起使用时,它有一个挂起条目列表(pel),充当临时列表。
然后客户端执行 XACK 一经处理就从像素中除去元素。然后,您还可以免受客户端故障的影响。
redis流对于解决您试图解决的复杂问题非常有用。你可能想免费上这门课。

i7uaboj4

i7uaboj42#

你可以用预取器。
与每个消费者贪婪地从队列中挑选一个商品导致“到处都是水,但一滴也不能喝”的问题不同,您可以使用一个预取器来构建一个大小为6的数据包。当预取器有一个完整的数据包时,它可以将该项目放在一个单独的数据包队列(另一个redis键,带有数据包列表)中,并在一个事务中从主队列弹出项目。基本上,你写的是:
如果只有一个客户机,那么客户机可以简单地运行llen命令来检查列表是否至少包含n个项目,并使用lpop/rpop减去n。
如果预取器没有一个完整的数据包,它就什么也不做,一直等待主队列大小达到6。
在用户端,他们只需查询预取的数据包队列,弹出最上面的数据包就可以了。它总是1个预构建包(大小=6个项目)。如果没有可用的数据包,它们将等待。
在生产者方面,不需要任何改变。他们可以继续插入主队列。
顺便说一句,可以有多个预取器任务同时运行,它们可以在它们之间同步对主队列的访问。

实现可扩展的预取器

预取器的实现可以用自助餐表来描述。把排队的队伍想象成餐厅的自助餐桌,客人可以在那里拿食物离开。礼仪要求客人按照排队制度排队等候。预取程序也会遵循类似的方法。算法如下:

Algorithm Prefetch
Begin
   while true
      check = main queue has 6 items or more    // this is a queue read. no locks required
      if(check == true)
         obtain an exclusive lock on the main queue
         if lock successful
            begin a transaction
            create a packet and fill it with top 6 items from 
              the queue after popping them
            add the packet to the prefetch queue

            if packet added to prefetch queue successfully
              commit the transaction
            else
              rollback the transaction
            end if

            release the lock

         else
            // someone else has the excl lock, we should just wait
            sleep for xx millisecs
         end if
      end if
   end while

End

为了简单起见,我在这里展示了一个无限轮询循环。但这可以通过redis通知使用pub/sub模式来实现。因此,预取程序只需等待主队列键正在接收lpush的通知,然后在 while 上面的环体。
你还有其他方法可以做到这一点。但这会给你一些想法。

相关问题