RabbitMQ示例:多线程、通道和队列

yzckvree  于 2022-12-29  发布在  RabbitMQ
关注(0)|答案(3)|浏览(318)

我刚刚读了RabbitMQ's Java API docs,发现它信息量很大,而且很直接。这个例子说明了如何设置一个简单的Channel来发布/消费,非常容易理解。但是它是一个非常简单/基本的例子,它给我留下了一个重要的问题:如何设置1+ Channels以向多个队列发布/从多个队列使用?
假设我有一个RabbitMQ服务器,上面有3个队列:loggingsecurity_eventscustomer_orders。因此,我们要么需要一个Channel,以便能够向所有3个队列发布/消费,要么更有可能的是,使用3个单独的Channels,每个Channels专用于一个队列。
除此之外,RabbitMQ的最佳实践要求我们为每个消费者线程设置1个Channel。对于本例,假设security_events只需要1个消费者线程就可以了,但是loggingcustomer_order都需要5个线程来处理卷。那么,如果我理解正确的话,这是否意味着我们需要:

  • 1个Channel和1个用于向security_events发布/从security_events消费的消费者线程;以及
  • 5个Channels和5个用于向logging发布/从logging消费的消费者线程;以及
  • 5个Channels和5个用于向customer_orders和从customer_orders发布/使用的使用者线程?

如果我的理解有误,请纠正我。不管怎样,某个厌倦了战斗的RabbitMQ老手可以用一个像样的代码示例来帮助我“连接点”,以设置满足我这里要求的发布者/消费者吗?

l5tcr1uw

l5tcr1uw1#

我觉得你在初步认识上有几个问题,坦白说,看到以下几点我有点吃惊:both need 5 threads to handle the volume。你是如何确定你需要的确切数量的?你有任何保证5个线程将足够?
RabbitMQ经过了调优和时间测试,所以它完全是关于适当的设计和高效的消息处理。
让我们试着回顾一下这个问题并找到一个合适的解决方案。顺便说一句,消息队列本身并不能保证你有真正好的解决方案。你必须理解你在做什么,并做一些额外的测试。
正如你肯定知道有许多布局可能:

我将使用布局B作为说明1生产者N消费者问题的最简单方法。既然您如此担心吞吐量。顺便说一句,正如您所期望的那样,RabbitMQ表现得相当好(source)。请注意prefetchCount,我稍后将解决它:

因此,消息处理逻辑很可能是确保有足够吞吐量的正确位置。当然,每次需要处理消息时,您都可以跨越一个新线程,但最终这种方法会杀死您的系统。基本上,线程越多,延迟就越大(如果需要,您可以选择Amdahl's law)。

(see Amdahl’s law illustrated

    • 提示1:小心使用线程,使用ThreadPools(details)**

线程池可以描述为Runnable对象(工作队列)的集合和正在运行的线程的连接。这些线程持续运行,并检查工作查询是否有新的工作。如果有新的工作要做,它们会执行这个Runnable。Thread类本身提供了一个方法,例如execute(Runnable r)来向工作队列添加新的Runnable对象。

public class Main {
  private static final int NTHREDS = 10;

  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
    for (int i = 0; i < 500; i++) {
      Runnable worker = new MyRunnable(10000000L + i);
      executor.execute(worker);
    }
    // This will make the executor accept no new threads
    // and finish all existing threads in the queue
    executor.shutdown();
    // Wait until all threads are finish
    executor.awaitTermination();
    System.out.println("Finished all threads");
  }
}
    • 提示#2:小心消息处理开销**

我认为这是一种明显的优化技术。很可能你会发送小而容易处理的消息。整个方法就是不断地设置和处理小消息。大消息最终会开一个坏玩笑,所以最好避免。

因此,最好发送微小的信息片段,但是处理怎么办?每次提交作业都会产生开销。在传入消息率较高的情况下,批处理非常有用。

例如,假设我们有简单的消息处理逻辑,并且不希望每次处理消息时都有特定于线程的开销,为了优化这个非常简单的CompositeRunnable can be introduced

class CompositeRunnable implements Runnable {

    protected Queue<Runnable> queue = new LinkedList<>();

    public void add(Runnable a) {
        queue.add(a);
    }

    @Override
    public void run() {
        for(Runnable r: queue) {
            r.run();
        }
    }
}

或者通过收集要处理的消息以稍微不同的方式执行相同的操作:

class CompositeMessageWorker<T> implements Runnable {

    protected Queue<T> queue = new LinkedList<>();

    public void add(T message) {
        queue.add(message);
    }

    @Override
    public void run() {
        for(T message: queue) {
            // process a message
        }
    }
}

通过这种方式,您可以更有效地处理消息。

    • 技巧3:优化消息处理**

尽管您知道可以并行处理消息(Tip #1)并减少处理开销(Tip #2),但您必须快速完成所有操作。冗余的处理步骤、繁重的循环等可能会严重影响性能。请参阅有趣的案例研究:

Improving Message Queue Throughput tenfold by choosing the right XML Parser

    • 技巧4:连接和通道管理**
  • 在现有连接上启动新通道涉及一个网络往返-启动新连接需要几个网络往返。
  • 每个连接都使用服务器上的文件描述符,而通道则不使用。
  • 在一个通道上发布大消息会在消息传出时阻塞连接,除此之外,复用是相当透明的。
  • 如果服务器过载,正在发布的连接可能会被阻塞-最好将发布连接和使用连接分开
  • 准备好处理突发消息

source
请注意,所有的提示是完美的工作在一起。随时让我知道如果你需要额外的细节。

    • 完整的消费者示例(source)**

请注意以下事项:

      • channel.basicQos(预取)**-如前所述,prefetchCount可能非常有用:

此命令允许使用者选择预取窗口,该窗口指定了它准备接收的未确认消息的数量。通过将预取计数设置为非零值,代理将不会向使用者传送任何超过该限制的消息。要向前移动窗口,使用者必须确认收到一条消息(或一组消息)。

      • ExecutorService线程执行器**-您可以指定正确配置的执行器服务。

示例:

static class Worker extends DefaultConsumer {

    String name;
    Channel channel;
    String queue;
    int processed;
    ExecutorService executorService;

    public Worker(int prefetch, ExecutorService threadExecutor,
                  , Channel c, String q) throws Exception {
        super(c);
        channel = c;
        queue = q;
        channel.basicQos(prefetch);
        channel.basicConsume(queue, false, this);
        executorService = threadExecutor;
    }

    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        Runnable task = new VariableLengthTask(this,
                                               envelope.getDeliveryTag(),
                                               channel);
        executorService.submit(task);
    }
}

您还可以检查以下内容:

oknwwptz

oknwwptz2#

如何设置1+通道以向/从多个队列发布/使用?

你可以使用线程和通道来实现。你所需要的只是一种对事物进行分类的方法,即来自登录的所有队列项,来自security_events的所有队列元素等。分类可以使用routingKey来实现。
例如:每次当你添加一个项目到队列中时,你指定路由键。2它将作为一个属性元素被附加。3通过这个你可以从一个特定的事件中得到值,比如 * logging *。
下面的代码示例解释了如何在客户端完成此操作。

    • 例如:**
    • 路由关键字用于识别通道类型并检索类型。**

例如,如果您需要获取关于类型Login的所有通道,那么您必须将路由关键字指定为login或其他一些关键字来标识它。

Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            string routingKey="login";

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
    • 您可以查看here以了解有关分类的详细信息..**

螺纹部件

发布部分结束后,您可以运行线程部分。
在这一部分你可以得到的基础上发布的数据类别。即;路由关键字,在您的情况下是日志、security_events和customer_orders等。
查看示例以了解如何在线程中检索数据。

    • 例如:**
ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
//**The threads part is as follows** 
 channel.exchangeDeclare(EXCHANGE_NAME, "direct");      
 String queueName = channel.queueDeclare().getQueue();
    // This part will biend the queue with the severity (login for eg:)
    for(String severity : argv){
              channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    }
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "myConsumerTag",
    new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
         throws IOException
     {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();

             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
     }
 });

现在创建了一个处理登录类型(路由键)队列中的数据的线程。通过这种方式,您可以创建多个线程。每个线程用于不同的目的。

    • 有关线程部分的详细信息,请参阅here ..**
s5a0g9ez

s5a0g9ez3#

    • 直接回答**

对于您的特定情况(loggingcustomer_order都需要5个线程),我将为logging创建1个Channel和1个Consumer,为customer_order创建1个Channel和1个Consumer。一个由loggingConsumer使用,另一个由customer_orderConsumer使用。
请参见下面的消耗了解它为什么应该起作用。
PS:不要在Consumer内部创建线程池;还要注意Channel.basicConsume(...)没有阻塞

    • 发布**

根据通道和并发注意事项(线程安全):
最好完全避免在共享通道上并发发布,例如,通过每个线程使用一个通道...在共享通道上的一个线程中消费并在另一个线程中发布可能是安全的。
很清楚...

    • 消耗量**

Channel可能(我说 * 可能 * 是因为这个原因)在同一个线程中运行它所有的Consumer;通过订阅接收消息("推送API")几乎明确地传达了这一思想:
每个通道都有其自己的调度线程。对于每个通道一个使用者的最常见使用情况,这意味着使用者不会阻碍其他使用者。如果每个通道有多个使用者,请注意长时间运行的使用者可能会阻碍向该通道上的其他使用者调度回调
这意味着在某些情况下,属于同一个Channel的许多Consumer会在同一个线程上运行,这样第一个会 * 阻碍回调的调度 *,因为 * dispatch * 这个词有时指的是"线程工作调度",而这里主要指的是调用Consumer.handleDelivery(再次参见)。
但是 * 自己的调度线程 * 是关于什么的呢?是关于与一起使用的线程池中的一个(请参见通道和并发注意事项(线程安全)):
服务器推送传递...使用java.util.concurrent.ExecutorService,每个连接一个。

    • 结论**

如果一个人有1个Channel和1个Consumer,但是想要并行处理传入消息,那么他最好创建(Consumer之外)并使用(Consumer之内)他自己的线程池;因此,每个Consumer接收的消息将在 * 用户 * 的线程池上处理,而不是在Channel的 * 自己的调度线程 * 上处理。
这种方法(从Consumer使用用户的线程池)是否可能/有效/可接受?如果是,请参见通道和并发注意事项(线程安全):
接收传递的线程(例如,使用者#handleDelivery将传递处理委托给其他线程)...

相关问题