Rabbitmq -如何在交换机上收听消息

ar7v8xwq  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(3)|浏览(176)

我有一个用Java编写的程序,可以向RabbitMQ发送消息。我所知道的只是交换名称。没有队列、绑定等等。
我的问题是这样的:如果只知道交换名称,我如何查看程序是否成功发送这些文件?

  • 谢谢-谢谢
    顺祝商祺,Serban
ifsvaxew

ifsvaxew1#

您可以使用RabbitMQ启用publisher confirmation,这就像有一个send-transaction,RabbitMQ会告诉您消息是否发送成功。

zqdjd7g9

zqdjd7g92#

假设我们有RabbitMQExchange,我们需要创建一个队列来将消息推送到交换,并从队列中使用它,如下所示

private static final String EXCHANGE_NAME = "2022";
    private static final String QUEUE_NAME = "2022";
    private final static boolean durable = true;

// now we need to create a connection to rabbitmq server //

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        Connection conn = factory.newConnection();
       // create rabbitmq connection chaneel        
       Channel channel = conn.createChannel();
      //Declare Exchange // 
      channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
     // push message to rabbitmq exchange 

channel.basicPublish(EXCHANGE_NAME, "routingkey", null, yourmessage.getBytes());

以上是作为生产者工作,现在我们需要创建队列消费者

private static final String EXCHANGE_NAME = "2022";
    private static final String QUEUE_NAME = "2022";
    private final static boolean durable = true;

// now we need to create a connection to rabbitmq server //

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        Connection conn = factory.newConnection();
        // create rabbitmq connection chaneel       
        Channel channel = conn.createChannel();

          channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

          //Queue Declare //
          channel.queueDeclare(QUEUE_NAME, true, false, false, null);
         //Queue bind //
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey");
        // Queue Consume // 
        QueueingConsumer consumer = new QueueingConsumer(channel);

  while (true)
        {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");

        }
ercv8c1e

ercv8c1e3#

请看这里:https://www.rabbitmq.com/tutorials/tutorial-three-java.html

String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "EXCHANGE_NAME", "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, 
                             Envelope envelope,
                             AMQP.BasicProperties properties, 
                             byte[] body) throws IOException 
{
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
  }
};
channel.basicConsume(queueName, true, consumer);

三言两语,就得:
1.创建一个队列,在本例中为匿名队列
1.将队列绑定到Exchange
由于绑定可以在fanouttopicdirect之间更改,因此了解exchange的类型非常重要
在本例中为fanout

相关问题