RabbitMQ02_HelloWorld模型的消息生产和消费

x33g5p2x  于2021-12-19 转载在 其他  
字(4.4k)|赞(0)|评价(0)|浏览(407)

RabbitMQ02_HelloWorld模型的消息生产和消费

  • 引入RabbitMQ的相关依赖:
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>
  • 测试消息生产者:
@Test
public void testSendMessage() throws IOException, TimeoutException {

    //创建连接mq的连接工厂对象
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置连接mq的主机
    connectionFactory.setHost("118.31.106.51");
    //设置端口号
    connectionFactory.setPort(5672);
    //设置连接哪个虚拟主机
    connectionFactory.setVirtualHost("/ems");
    //设置访问虚拟主机的用户名和密码
    connectionFactory.setUsername("ems");
    connectionFactory.setPassword("123456");
    //获取连接对象
    Connection connection = connectionFactory.newConnection();
    //获取连接中的通道
    Channel channel = connection.createChannel();
    //通道绑定对应消息队列,参数1:队列名称(如果不存在则会自动创建)
    //参数2:定义队列特性是否要持久化 参数3:是否独占队列 参数4:是否消费完成后自动删除队列
    //参数5:附加参数
    channel.queueDeclare("hello",false,false,false,null);
    //发布消息
    //参数1:交换机名称 参数2:队列名称 参数3:传递消息的额外参数
    //参数4:消息的具体内容
    channel.basicPublish("", "hello", null, "hello RabbitMQ".getBytes());
    //关闭通道和连接
    channel.close();
    connection.close();

}
  • 测试消息消费者
public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("118.31.106.51");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/ems");
    connectionFactory.setUsername("ems");
    connectionFactory.setPassword("123456");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);
    // 消费消息 参数1:消息队列名称 参数2:开启消息的自动确认机制
    // 参数3:消费时的回调接口
    channel.basicConsume("hello", true, new DefaultConsumer(channel) {
        //从最后一个参数(body)中取出消息
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
            System.out.println("body=="+new String(body));
        }
    });

    //如果不关闭通道和连接,主函数将持续消费消息,否则只消费一次消息
    //channel.close();
    //connection.close();
}
  • 代码优化:封装连接工具类
package com.blu.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {
	
	private static ConnectionFactory connectionFactory;
	
	static {
		connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("118.31.106.51");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/ems");
		connectionFactory.setUsername("ems");
		connectionFactory.setPassword("123456");
	}

	// 定义提供连接对象的方法
	public static Connection getConnection() {
		try {
			
			return connectionFactory.newConnection();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;

	}

	// 关闭通道和连接的方法
	public static void closeConnectionAndChannel(Channel channel, Connection connection) {
		try {
			if(channel!=null) {
				channel.close();
			}
			if(connection!=null) {
				connection.close();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}

}
  • 优化消息生产者:
@Test
public void testSendMessage() throws IOException, TimeoutException {	
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello",false,false,false,null);
    channel.basicPublish("", "hello", null, "hello RabbitMQ".getBytes());
    RabbitMQUtils.closeConnectionAndChannel(channel, connection);
}
  • 优化消息消费者:
public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);
    channel.basicConsume("hello", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
            System.out.println("body=="+new String(body));
        }
    });
    //RabbitMQUtils.closeConnectionAndChannel(channel, connection);
}
  • 参数补充:
@Test
public void testSendMessage() throws IOException, TimeoutException {	
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //参数2:表示允许队列持久化 参数3:是否独占队列 参数4:是否消费完成后自动删除队列
    //参数5:附加参数
    channel.queueDeclare("hello",true,false,false,null);
    //参数1:交换机名称 参数3:传递消息的额外参数(MessageProperties.PERSISTENT_TEXT_PLAIN表示支持消息的持久化)
    channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello RabbitMQ".getBytes());
    RabbitMQUtils.closeConnectionAndChannel(channel, connection);
}

相关文章