我刚刚读了RabbitMQ's Java API docs,发现它信息量很大,而且很直接。这个例子说明了如何设置一个简单的Channel
来发布/消费,非常容易理解。但是它是一个非常简单/基本的例子,它给我留下了一个重要的问题:如何设置1+ Channels
以向多个队列发布/从多个队列使用?
假设我有一个RabbitMQ服务器,上面有3个队列:logging
、security_events
和customer_orders
。因此,我们要么需要一个Channel
,以便能够向所有3个队列发布/消费,要么更有可能的是,使用3个单独的Channels
,每个Channels
专用于一个队列。
除此之外,RabbitMQ的最佳实践要求我们为每个消费者线程设置1个Channel
。对于本例,假设security_events
只需要1个消费者线程就可以了,但是logging
和customer_order
都需要5个线程来处理卷。那么,如果我理解正确的话,这是否意味着我们需要:
- 1个
Channel
和1个用于向security_events
发布/从security_events
消费的消费者线程;以及 - 5个
Channels
和5个用于向logging
发布/从logging
消费的消费者线程;以及 - 5个
Channels
和5个用于向customer_orders
和从customer_orders
发布/使用的使用者线程?
如果我的理解有误,请纠正我。不管怎样,某个厌倦了战斗的RabbitMQ老手可以用一个像样的代码示例来帮助我“连接点”,以设置满足我这里要求的发布者/消费者吗?
3条答案
按热度按时间l5tcr1uw1#
我觉得你在初步认识上有几个问题,坦白说,看到以下几点我有点吃惊:
both need 5 threads to handle the volume
。你是如何确定你需要的确切数量的?你有任何保证5个线程将足够?RabbitMQ经过了调优和时间测试,所以它完全是关于适当的设计和高效的消息处理。
让我们试着回顾一下这个问题并找到一个合适的解决方案。顺便说一句,消息队列本身并不能保证你有真正好的解决方案。你必须理解你在做什么,并做一些额外的测试。
正如你肯定知道有许多布局可能:
我将使用布局
B
作为说明1
生产者N
消费者问题的最简单方法。既然您如此担心吞吐量。顺便说一句,正如您所期望的那样,RabbitMQ表现得相当好(source)。请注意prefetchCount
,我稍后将解决它:因此,消息处理逻辑很可能是确保有足够吞吐量的正确位置。当然,每次需要处理消息时,您都可以跨越一个新线程,但最终这种方法会杀死您的系统。基本上,线程越多,延迟就越大(如果需要,您可以选择Amdahl's law)。
(see Amdahl’s law illustrated)
线程池可以描述为Runnable对象(工作队列)的集合和正在运行的线程的连接。这些线程持续运行,并检查工作查询是否有新的工作。如果有新的工作要做,它们会执行这个Runnable。Thread类本身提供了一个方法,例如execute(Runnable r)来向工作队列添加新的Runnable对象。
我认为这是一种明显的优化技术。很可能你会发送小而容易处理的消息。整个方法就是不断地设置和处理小消息。大消息最终会开一个坏玩笑,所以最好避免。
因此,最好发送微小的信息片段,但是处理怎么办?每次提交作业都会产生开销。在传入消息率较高的情况下,批处理非常有用。
例如,假设我们有简单的消息处理逻辑,并且不希望每次处理消息时都有特定于线程的开销,为了优化这个非常简单的
CompositeRunnable can be introduced
:或者通过收集要处理的消息以稍微不同的方式执行相同的操作:
通过这种方式,您可以更有效地处理消息。
尽管您知道可以并行处理消息(
Tip #1
)并减少处理开销(Tip #2
),但您必须快速完成所有操作。冗余的处理步骤、繁重的循环等可能会严重影响性能。请参阅有趣的案例研究:Improving Message Queue Throughput tenfold by choosing the right XML Parser
(source)
请注意,所有的提示是完美的工作在一起。随时让我知道如果你需要额外的细节。
请注意以下事项:
prefetchCount
可能非常有用:此命令允许使用者选择预取窗口,该窗口指定了它准备接收的未确认消息的数量。通过将预取计数设置为非零值,代理将不会向使用者传送任何超过该限制的消息。要向前移动窗口,使用者必须确认收到一条消息(或一组消息)。
示例:
您还可以检查以下内容:
oknwwptz2#
如何设置1+通道以向/从多个队列发布/使用?
你可以使用线程和通道来实现。你所需要的只是一种对事物进行分类的方法,即来自登录的所有队列项,来自security_events的所有队列元素等。分类可以使用routingKey来实现。
例如:每次当你添加一个项目到队列中时,你指定路由键。2它将作为一个属性元素被附加。3通过这个你可以从一个特定的事件中得到值,比如 * logging *。
下面的代码示例解释了如何在客户端完成此操作。
例如,如果您需要获取关于类型Login的所有通道,那么您必须将路由关键字指定为login或其他一些关键字来标识它。
螺纹部件
发布部分结束后,您可以运行线程部分。
在这一部分你可以得到的基础上发布的数据类别。即;路由关键字,在您的情况下是日志、security_events和customer_orders等。
查看示例以了解如何在线程中检索数据。
现在创建了一个处理登录类型(路由键)队列中的数据的线程。通过这种方式,您可以创建多个线程。每个线程用于不同的目的。
s5a0g9ez3#
对于您的特定情况(
logging
和customer_order
都需要5个线程),我将为logging
创建1个Channel
和1个Consumer
,为customer_order
创建1个Channel
和1个Consumer
。一个由logging
Consumer
使用,另一个由customer_order
Consumer
使用。请参见下面的消耗了解它为什么应该起作用。
PS:不要在
Consumer
内部创建线程池;还要注意Channel.basicConsume(...)
没有阻塞根据通道和并发注意事项(线程安全):
最好完全避免在共享通道上并发发布,例如,通过每个线程使用一个通道...在共享通道上的一个线程中消费并在另一个线程中发布可能是安全的。
很清楚...
Channel
可能(我说 * 可能 * 是因为这个原因)在同一个线程中运行它所有的Consumer
;通过订阅接收消息("推送API")几乎明确地传达了这一思想:每个通道都有其自己的调度线程。对于每个通道一个使用者的最常见使用情况,这意味着使用者不会阻碍其他使用者。如果每个通道有多个使用者,请注意长时间运行的使用者可能会阻碍向该通道上的其他使用者调度回调。
这意味着在某些情况下,属于同一个
Channel
的许多Consumer
会在同一个线程上运行,这样第一个会 * 阻碍回调的调度 *,因为 * dispatch * 这个词有时指的是"线程工作调度",而这里主要指的是调用Consumer.handleDelivery
(再次参见)。但是 * 自己的调度线程 * 是关于什么的呢?是关于与一起使用的线程池中的一个(请参见通道和并发注意事项(线程安全)):
服务器推送传递...使用
java.util.concurrent.ExecutorService
,每个连接一个。如果一个人有1个
Channel
和1个Consumer
,但是想要并行处理传入消息,那么他最好创建(Consumer
之外)并使用(Consumer
之内)他自己的线程池;因此,每个Consumer
接收的消息将在 * 用户 * 的线程池上处理,而不是在Channel
的 * 自己的调度线程 * 上处理。这种方法(从
Consumer
使用用户的线程池)是否可能/有效/可接受?如果是,请参见通道和并发注意事项(线程安全):接收传递的线程(例如,使用者#handleDelivery将传递处理委托给其他线程)...