假设redis中有一个键k,它持有一个值列表。
许多producer客户机正在使用lpush或rpush将元素逐个添加到此列表中。
另一方面,另一组客户机正在从列表中弹出元素,尽管有一定的限制。消费者只会尝试弹出n个项目,只有当列表包含至少n个项目时。这样可以确保消费者在完成弹出过程后手里会拿着n件物品
如果列表包含的项目数少于n个,那么消费者甚至不应该尝试从列表中弹出元素,因为他们在最后不会有至少n个项目。
如果只有一个客户机,那么客户机可以简单地运行 LLEN
命令检查列表是否至少包含n个项,并使用lpop/rpop减去n。
但是,如果有许多消费者客户机,则可能存在竞争条件,并且他们可以在读取后同时从列表中弹出项目 LLEN
>=n。因此,我们可能最终处于这样一种状态,即每个消费者可能弹出少于n个元素,并且redis中的列表中没有剩余的项。
使用一个单独的锁定系统似乎是解决这个问题的一种方法,但是我很好奇这种类型的操作是否只能使用redis命令来完成,比如multi/exec/watch等等。
我检查了multi/exec方法,似乎它们不支持回滚。另外,multi/exec事务之间执行的所有命令都将返回'queued',因此我无法知道在事务中执行的n个lpop是否都将返回元素。
2条答案
按热度按时间gcuhipw91#
所以你所需要的是一个原子的方式来检查列表长度和弹出条件。
这就是lua脚本的用途,请参见
EVAL
命令。下面是一个lua脚本,让您开始:
用作
这只会爆炸
ARGV[1]
如果列表中至少有那么多元素,则返回列表中的元素(键名后面的数字)。lua脚本是以原子方式运行的,因此在读取客户机之间没有竞争条件。
正如op在评论中指出的那样,有数据丢失的风险,比如说因为
LPOP
然后脚本返回。你可以用RPOPLPUSH
而不是LPOP
,将元素存储在临时列表中。然后还需要一些跟踪、删除和恢复逻辑。注意你的客户也可能会死,留下一些未处理的元素。您可能想看看redis streams。这种数据结构非常适合在多个客户机之间分配负载。当与消费者组一起使用时,它有一个挂起条目列表(pel),充当临时列表。
然后客户端执行
XACK
一经处理就从像素中除去元素。然后,您还可以免受客户端故障的影响。redis流对于解决您试图解决的复杂问题非常有用。你可能想免费上这门课。
i7uaboj42#
你可以用预取器。
与每个消费者贪婪地从队列中挑选一个商品导致“到处都是水,但一滴也不能喝”的问题不同,您可以使用一个预取器来构建一个大小为6的数据包。当预取器有一个完整的数据包时,它可以将该项目放在一个单独的数据包队列(另一个redis键,带有数据包列表)中,并在一个事务中从主队列弹出项目。基本上,你写的是:
如果只有一个客户机,那么客户机可以简单地运行llen命令来检查列表是否至少包含n个项目,并使用lpop/rpop减去n。
如果预取器没有一个完整的数据包,它就什么也不做,一直等待主队列大小达到6。
在用户端,他们只需查询预取的数据包队列,弹出最上面的数据包就可以了。它总是1个预构建包(大小=6个项目)。如果没有可用的数据包,它们将等待。
在生产者方面,不需要任何改变。他们可以继续插入主队列。
顺便说一句,可以有多个预取器任务同时运行,它们可以在它们之间同步对主队列的访问。
实现可扩展的预取器
预取器的实现可以用自助餐表来描述。把排队的队伍想象成餐厅的自助餐桌,客人可以在那里拿食物离开。礼仪要求客人按照排队制度排队等候。预取程序也会遵循类似的方法。算法如下:
为了简单起见,我在这里展示了一个无限轮询循环。但这可以通过redis通知使用pub/sub模式来实现。因此,预取程序只需等待主队列键正在接收lpush的通知,然后在
while
上面的环体。你还有其他方法可以做到这一点。但这会给你一些想法。