php 处理消息队列中的重复[已关闭]

dxpyg8gm  于 2023-10-15  发布在  PHP
关注(0)|答案(3)|浏览(105)

已关闭,此问题为opinion-based。它目前不接受回答。
**想改善这个问题吗?**更新问题,以便editing this post可以用事实和引用来回答。

5天前关闭。
Improve this question
我一直在和我的程序员争论最好的方法。我们的数据以每秒大约10000个物体的速度进入。这需要异步处理,但松散的顺序就足够了,因此每个对象都被循环插入到几个消息队列中的一个(也有几个生产者和消费者)。每个对象约为300字节。它需要持久,因此MQ被配置为持久存储到磁盘。
问题是,这些对象通常是重复的(因为它们在进入生产者的数据中是可重复的)。它们确实有10字节的唯一ID。如果对象在队列中重复,这不是灾难性的,但如果它们在从队列中取出后在处理中重复,这就是灾难性的。在确保对象处理过程中没有重复的同时,确保尽可能接近线性可伸缩性的最佳方法是什么?也许与此相关的是,应该将整个对象存储在消息队列中,还是只将id和主体存储在类似cassandra的东西中?
谢谢你,谢谢!

**编辑:**确认重复的位置。另外,到目前为止,我有两个关于Redis的建议。我之前一直在考虑RabbitMQ。关于我的要求,每一个的优点和缺点是什么?

mzsu5hc0

mzsu5hc01#

p.s:这是我生命中第一次Redis网站出现问题,但我敢打赌,当你访问它时,他们已经解决了问题

> We have data that comes in at a rate
> of about 10000 objects per second.
> This needs to be processed
> asynchronously, but loose ordering is
> sufficient, so each object is inserted
> round-robin-ly into one of several
> message queues (there are also several
> producers and consumers)

我的第一个建议是看看redis,因为它非常快,我敢打赌你可以只用一个消息队列处理所有的消息。
首先,我想向你展示我的笔记本电脑的信息(我喜欢它,但一个大的服务器将是一个更快;我爸爸最近买了一台新电脑,它比我的笔记本电脑强多了(8个CPU,而不是2个)。

-Computer-
Processor       : 2x Intel(R) Core(TM)2 Duo CPU     T7100  @ 1.80GHz
Memory      : 2051MB (1152MB used)
Operating System        : Ubuntu 10.10
User Name       : alfred (alfred)
-Display-
Resolution      : 1920x1080 pixels
OpenGL Renderer     : Unknown
X11 Vendor      : The X.Org Foundation
-Multimedia-
Audio Adapter       : HDA-Intel - HDA Intel
-Input Devices-
 Power Button
 Lid Switch
 Sleep Button
 Power Button
 AT Translated Set 2 keyboard
 Microsoft Comfort Curve Keyboard 2000
 Microsoft Comfort Curve Keyboard 2000
 Logitech Trackball
 Video Bus
 PS/2 Logitech Wheel Mouse
-SCSI Disks-
HL-DT-ST DVDRAM GSA-T20N
ATA WDC WD1600BEVS-2

下面是在我的机器上使用redis-benchmark的基准测试,甚至没有做太多的redis优化:

alfred@alfred-laptop:~/database/redis-2.2.0-rc4/src$ ./redis-benchmark 
====== PING (inline) ======
  10000 requests completed in 0.22 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

94.84% <= 1 milliseconds
98.74% <= 2 milliseconds
99.65% <= 3 milliseconds
100.00% <= 4 milliseconds
46296.30 requests per second

====== PING ======
  10000 requests completed in 0.22 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

91.30% <= 1 milliseconds
98.21% <= 2 milliseconds
99.29% <= 3 milliseconds
99.52% <= 4 milliseconds
100.00% <= 4 milliseconds
45662.10 requests per second

====== MSET (10 keys) ======
  10000 requests completed in 0.32 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

3.45% <= 1 milliseconds
88.55% <= 2 milliseconds
97.86% <= 3 milliseconds
98.92% <= 4 milliseconds
99.80% <= 5 milliseconds
99.94% <= 6 milliseconds
99.95% <= 9 milliseconds
99.96% <= 10 milliseconds
100.00% <= 10 milliseconds
30864.20 requests per second

====== SET ======
  10000 requests completed in 0.21 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

92.45% <= 1 milliseconds
98.78% <= 2 milliseconds
99.00% <= 3 milliseconds
99.01% <= 4 milliseconds
99.53% <= 5 milliseconds
100.00% <= 5 milliseconds
47169.81 requests per second

====== GET ======
  10000 requests completed in 0.21 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

94.50% <= 1 milliseconds
98.21% <= 2 milliseconds
99.50% <= 3 milliseconds
100.00% <= 3 milliseconds
47619.05 requests per second

====== INCR ======
  10000 requests completed in 0.23 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

91.90% <= 1 milliseconds
97.45% <= 2 milliseconds
98.59% <= 3 milliseconds
99.51% <= 10 milliseconds
99.78% <= 11 milliseconds
100.00% <= 11 milliseconds
44444.45 requests per second

====== LPUSH ======
  10000 requests completed in 0.21 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

95.02% <= 1 milliseconds
98.51% <= 2 milliseconds
99.23% <= 3 milliseconds
99.51% <= 5 milliseconds
99.52% <= 6 milliseconds
100.00% <= 6 milliseconds
47619.05 requests per second

====== LPOP ======
  10000 requests completed in 0.21 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

95.89% <= 1 milliseconds
98.69% <= 2 milliseconds
98.96% <= 3 milliseconds
99.51% <= 5 milliseconds
99.98% <= 6 milliseconds
100.00% <= 6 milliseconds
47619.05 requests per second

====== SADD ======
  10000 requests completed in 0.22 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

91.08% <= 1 milliseconds
97.79% <= 2 milliseconds
98.61% <= 3 milliseconds
99.25% <= 4 milliseconds
99.51% <= 5 milliseconds
99.81% <= 6 milliseconds
100.00% <= 6 milliseconds
45454.55 requests per second

====== SPOP ======
  10000 requests completed in 0.22 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

91.88% <= 1 milliseconds
98.64% <= 2 milliseconds
99.09% <= 3 milliseconds
99.40% <= 4 milliseconds
99.48% <= 5 milliseconds
99.60% <= 6 milliseconds
99.98% <= 11 milliseconds
100.00% <= 11 milliseconds
46296.30 requests per second

====== LPUSH (again, in order to bench LRANGE) ======
  10000 requests completed in 0.23 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

91.00% <= 1 milliseconds
97.82% <= 2 milliseconds
99.01% <= 3 milliseconds
99.56% <= 4 milliseconds
99.73% <= 5 milliseconds
99.77% <= 7 milliseconds
100.00% <= 7 milliseconds
44247.79 requests per second

====== LRANGE (first 100 elements) ======
  10000 requests completed in 0.39 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

6.24% <= 1 milliseconds
75.78% <= 2 milliseconds
93.69% <= 3 milliseconds
97.29% <= 4 milliseconds
98.74% <= 5 milliseconds
99.45% <= 6 milliseconds
99.52% <= 7 milliseconds
99.93% <= 8 milliseconds
100.00% <= 8 milliseconds
25906.74 requests per second

====== LRANGE (first 300 elements) ======
  10000 requests completed in 0.78 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

1.30% <= 1 milliseconds
5.07% <= 2 milliseconds
36.42% <= 3 milliseconds
72.75% <= 4 milliseconds
93.26% <= 5 milliseconds
97.36% <= 6 milliseconds
98.72% <= 7 milliseconds
99.35% <= 8 milliseconds
100.00% <= 8 milliseconds
12886.60 requests per second

====== LRANGE (first 450 elements) ======
  10000 requests completed in 1.10 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

0.67% <= 1 milliseconds
3.64% <= 2 milliseconds
8.01% <= 3 milliseconds
23.59% <= 4 milliseconds
56.69% <= 5 milliseconds
76.34% <= 6 milliseconds
90.00% <= 7 milliseconds
96.92% <= 8 milliseconds
98.55% <= 9 milliseconds
99.06% <= 10 milliseconds
99.53% <= 11 milliseconds
100.00% <= 11 milliseconds
9066.18 requests per second

====== LRANGE (first 600 elements) ======
  10000 requests completed in 1.48 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

0.85% <= 1 milliseconds
9.23% <= 2 milliseconds
11.03% <= 3 milliseconds
15.94% <= 4 milliseconds
27.55% <= 5 milliseconds
41.10% <= 6 milliseconds
56.23% <= 7 milliseconds
78.41% <= 8 milliseconds
87.37% <= 9 milliseconds
92.81% <= 10 milliseconds
95.10% <= 11 milliseconds
97.03% <= 12 milliseconds
98.46% <= 13 milliseconds
99.05% <= 14 milliseconds
99.37% <= 15 milliseconds
99.40% <= 17 milliseconds
99.67% <= 18 milliseconds
99.81% <= 19 milliseconds
99.97% <= 20 milliseconds
100.00% <= 20 milliseconds
6752.19 requests per second

正如你从我的简单笔记本电脑的基准测试中所看到的,你可能只需要一个消息队列,因为redis可以在0.23秒内处理10000个lpush请求,在0.21秒内处理10000个lpop请求。当你只需要一个队列时,我相信你的问题不再是问题了(或者是生产者生产了我不完全理解的重复?)).

> And it needs to be durable, so the MQs
> are configured to persist to disk.

Redis也持续到光盘。

> The problem is that often these
> objects are duplicated. They do have
> 10-byte unique ids. It's not
> catastrophic if objects are duplicated
> in the queue, but it is if they're
> duplicated in the processing after
> being taken from the queue. What's the
> best way to go about ensuring as close
> as possible to linear scalability
> whilst ensuring there's no duplication
> in the processing of the objects?

当使用一个单一的消息队列(框),这个问题不存在,如果我理解正确。但是如果没有,你可以简单地检查id是否为is member of your set ids。当你处理id时,你应该remove it from the set ids。首先,您应该使用sadd将成员添加到列表中。
如果一个盒子不能再缩放,你应该把你的键分割到多个盒子上,然后选中那个盒子上的键。要了解更多关于这一点,我认为你应该阅读以下链接:

也许与此相关的是,应该将整个对象存储在消息队列中,还是只将id和主体存储在类似cassandra的东西中?
如果可能的话,你应该把所有的信息直接放进内存,因为没有什么东西能像内存一样快(好吧,你的缓存甚至更快,但真的很小,而且你不能通过代码访问它)。Redis会将所有信息存储在内存中,并将快照存储到磁盘上。我认为你应该能够将所有信息存储在内存中,而完全跳过使用Cassandra之类的东西。
让我们考虑每个对象是400字节每个对象总共在10000每秒的速度=> 4000000字节的所有对象每秒=> 4 MB/s,如果我的计算是正确的。你可以很容易地将这些信息存储在你的记忆中。如果你不能,你真的应该考虑升级你的内存,如果可能的话,因为内存不再那么昂贵了。

fsi0uk1n

fsi0uk1n2#

如果不知道消息是如何在系统中创建的,不知道生产者用于发布到队列的机制,不知道队列系统正在使用,就很难诊断发生了什么。
我见过这种情况发生在许多不同的方式;超时的worker导致消息在队列中再次可见(因此进行了第二次处理,这在Kestrel中很常见),错误配置的broker(想到HA ActiveMQ),错误配置的客户端(想到Spring加上Camel路由),客户端双重提交等。这类问题有很多种可能出现的方式。
由于我不能真正诊断这个问题,我将在这里插入redis。您可以轻松地将合并(SPOP,和SADD一样,时间复杂度为O(1))与pub/sub结合起来,以获得一个非常快、时间恒定、无重复(集合必须包含唯一元素)的队列。虽然这是一个ruby项目,resque可能会有所帮助。至少值得一看
祝你好运

fdbelqdn

fdbelqdn3#

如果您不介意将Camel加入其中,那么您可以使用idempotent-consumer EIP来帮助实现这一点。
此外,ActiveMQ Message Groups可以用于对相关消息进行分组,使它们更容易执行重复检查,并仍然保持高吞吐量等。

相关问题