Rabbitmq 缓存数据库

x33g5p2x  于2021-11-19 转载在 RabbitMQ  
字(10.9k)|赞(0)|评价(0)|浏览(425)

Rabbitmq概念

消息队列、消息服务器、消息中间件Borker
常见消息服务器:Rabbitmq Activemq Rocketmq(阿里的) Kafka Tubemq(腾讯)

搭建Rabbitmq服务器

VMware

  • 16.x
  • NAT网段 192.168.64.0
  • 编辑–虚拟网络编辑器–选择vmnet8–左下角修改成192.168.64.0

虚拟机

centos-8-2105/centos-7-1908

  • 已经做了基本配置
    yun安装源、扩展源使用阿里服务器
    安装了python、pip、ansible
    添加了两个脚本文件,方便配置ip地址(ip-static固定ip/ip-dhcp自动获取ip)
  • 1.解压centos-8-2105
  • 2.双击centos-8-2105.vmx加载镜像
  • 3.启动,按提示选择已复制虚拟机
  • 4.登录用户名密码都是 root

ip设置测试

  1. ./ip-dhcp 自动获取ip
  2. 如果网卡有问题 执行以下命令
# centos 8 开启 VMware 托管 
nmcli n on systemctl restart NetworkManager
# centos 7 禁用 NetworkManager 系统服务 
systemctl stop NetworkManager systemctl disable NetworkManager
#如果网络还有问题,可以充值vmware虚拟网络
编辑--虚拟网络篇编辑器--还原默认设置
还原默认设置会删除所有虚拟网络,重新创建,重新初始化

默认分配的ip

静态设置ip

准备docker环境

1.关闭centos-8-2105

2.克隆centos-8-2105:docker-base

3.mobaxterm连接docker-base

ip:192.168.64.3
注意:先打开虚拟机

4.上传文件到/root

/DevOPs/Docker/docker-install

5. 离线安装docker(参考CSND笔记)

# 进入 docker-install 文件夹
cd docker-install
# 为 docker-install 添加执行权限
chmod +x install.sh

# 安装
./install.sh -f docker-20.10.6.tgz

6.关机

一般执行shutdown

Docker运行Rabbitmq

1.克隆docker-base :rabbitmq

2.设置ip

./ip-static
ip:192.168.64.140
ifconfig

3.上传rabbitmq镜像到/root

/docker/rabbit-image.gz

4.导入镜像

docker load -i rabbit-image.gz

5.启动rabbitmq

关闭防火墙

systemctl stop firewalld
systemctl disable firewalld
 
# 重启 docker 系统服务
systemctl restart docker

配置管理员用户名和密码

mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf

# 添加两行配置:
default_user = admin
default_pass = admin

启动Rabbitmq

docker run -d --name rabbit \
-p 5672:5672 \
-p 15672:15672 \
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
--restart=always \
rabbitmq:management

访问网址http://192.168.64.140:15672

用户名/密码:admin

Rabbitmq使用场景

服务解耦

MQ Message Queue
数据吞吐量很大
数据排队

异步调用

feign是同步调用,效率低
mq异步调用

Rabbitmq六种收发消息方式

准备工作

新建工程

新建module

添加依赖

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>
    </dependencies>

简单模式

只有一个消费者

生产者代码

package m1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*简单模式*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);//收发消息5672
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();//创建连接
        Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
        //2.在服务器上创建 helloworld队列
        //队列如果已经存在,不会重复创建
        channel.queueDeclare("helloworld",false,false,false,null);
        //3.向helloworld发送消息
        channel.basicPublish("", "helloworld",null,"Hello World".getBytes());
    }
}

channel.queueDeclare()参数说明

  • 参数:
  • 1.队列名
  • 2.是否是持久队列
  • 3.是否是排他队列,可共享(独占队列)
  • 4.是否能被服务器自动删除(没有消费者的时候服务器是否自动删除)
  • 5.队列的其他参数属性(键值对)
    channel.basicPublish参数说明
  • 1.交换机,空串是默认交换机
  • 3.消息的其他参数属性(键值对)

消费者代码

package m1;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*简单模式--消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);//收发消息5672
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();//创建连接
        Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
        //2.在服务器上创建 helloworld队列
        //队列如果已经存在,不会重复创建
        channel.queueDeclare("helloworld",false,false,false,null);
        //创建回调对象
        //处理消息的回调函数
        DeliverCallback deliverCallback1=(consumerTag,message)->{
            byte[] a=message.getBody();
            String s=new String(a);
            System.out.println("收到"+s);
        };
        DeliverCallback deliverCallback=new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                byte[] a=message.getBody();
                String s=new String(a);
                System.out.println("收到"+s);
            }
        };
        //取消消息的回调对象
        CancelCallback cancelCallback=new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {

            }
        };
        //从helloworld接收消息,把消息传递到回调对象处理
// channel.basicConsume("helloworld",true,处理消息的回调对象,取消消息处理回调对象); channel.basicConsume("helloworld",true,deliverCallback1,cancelCallback);
    }
}

channel.basicConsume参数说明
第二个参数:确认方式 ACK --Acknowledgment

  • -true 自动确认
  • -false 手动确认(手动发送回执) 保证正确处理消息

工作模式

多个消费者

生产者代码

package m1;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*简单模式--消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);//收发消息5672
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();//创建连接
        Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
        //2.在服务器上创建 helloworld队列
        //队列如果已经存在,不会重复创建
        channel.queueDeclare("helloworld",false,false,false,null);
        //创建回调对象
        //处理消息的回调函数
        DeliverCallback deliverCallback1=(consumerTag,message)->{
            byte[] a=message.getBody();
            String s=new String(a);
            System.out.println("收到"+s);
        };
        DeliverCallback deliverCallback=new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                byte[] a=message.getBody();
                String s=new String(a);
                System.out.println("收到"+s);
            }
        };
        //取消消息的回调对象
        CancelCallback cancelCallback=new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {

            }
        };
        //从helloworld接收消息,把消息传递到回调对象处理
// channel.basicConsume("helloworld",true,处理消息的回调对象,取消消息处理回调对象);
        /* * 第二个参数:确认方式 ACK --Acknowledgment * -true 自动确认 * -false 手动确认(手动发送回执) 保证正确处理消息 * */
channel.basicConsume("helloworld",true,deliverCallback1,cancelCallback);
    }
}

消费者代码
业务:模拟耗时消息

package m2;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*工作模式--消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);//收发消息5672
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();//创建连接
        Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
        //2.在服务器上创建 helloworld队列
        //队列如果已经存在,不会重复创建
        channel.queueDeclare("helloworld",false,false,false,null);
        //回调对象
        DeliverCallback deliverCallback=(consumerTag,message)->{
            String s=new String(message.getBody());
            System.out.println(s);
            //遍历所有字符,遇到.暂停1s
            for (int i = 0; i < s.length(); i++) {
                if(s.charAt(i)=='.'){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("消息处理完成");
        };
        CancelCallback cancelCallback=(consumerTag)->{};
        //接收信息
        channel.basicConsume("helloworld",true,deliverCallback,cancelCallback);
    }
}

如何解决合理分发??

修改Consumer代码

package m2;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*工作模式--消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);//收发消息5672
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();//创建连接
        Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
        //2.在服务器上创建 helloworld队列
        //队列如果已经存在,不会重复创建
        channel.queueDeclare("helloworld",false,false,false,null);
        //回调对象
        DeliverCallback deliverCallback=(consumerTag,message)->{
            String s=new String(message.getBody());
            System.out.println(s);
            //遍历所有字符,遇到.暂停1s
            for (int i = 0; i < s.length(); i++) {
                if(s.charAt(i)=='.'){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
// channel.basicAck(回执,是否同时确认收到过的所有消息);
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println("消息处理完成");
        };
        CancelCallback cancelCallback=(consumerTag)->{};
        //每次只收一条,处理完之前不收下一条
        channel.basicQos(1);
        //接收信息
        //第二个参数为false手动确认
        channel.basicConsume("helloworld",false,deliverCallback,cancelCallback);
    }
}

重启消费者模块测试

如何实现持久化??

  1. 队列持久化

修改生产者代码

package m2;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/*工作模式--生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);//收发消息5672
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();//创建连接
        Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
        //2.在服务器上创建 helloworld队列
        //队列如果已经存在,不会重复创建
        channel.queueDeclare("task_queue",true,false,false,null);
        //循环输入消息发送
        while (true){
            System.out.println("输入消息:");
            String s=new Scanner(System.in).nextLine();
            channel.basicPublish("","task_queue",null,s.getBytes());
        }
    }
}

修改消费者代码

package m2;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*工作模式--消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);//收发消息5672
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();//创建连接
        Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
        //2.在服务器上创建 helloworld队列
        //队列如果已经存在,不会重复创建
        channel.queueDeclare("task_queue",true,false,false,null);
        //回调对象
        DeliverCallback deliverCallback=(consumerTag,message)->{
            String s=new String(message.getBody());
            System.out.println(s);
            //遍历所有字符,遇到.暂停1s
            for (int i = 0; i < s.length(); i++) {
                if(s.charAt(i)=='.'){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
// channel.basicAck(回执,是否同时确认收到过的所有消息);
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println("消息处理完成");
        };
        CancelCallback cancelCallback=(consumerTag)->{};
        //每次只收一条,处理完之前不收下一条
        channel.basicQos(1);
        //接收信息
        //第二个参数为false手动确认
        channel.basicConsume("task_queue",false,deliverCallback,cancelCallback);
    }
}
  1. 消息持久化
    ```生产者代码修改`

package m2;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/*工作模式--生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);//收发消息5672
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();//创建连接
        Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
        //2.在服务器上创建 helloworld队列
        //队列如果已经存在,不会重复创建
        channel.queueDeclare("task_queue",true,false,false,null);
        //循环输入消息发送
        while (true){
            System.out.println("输入消息:");
            String s=new Scanner(System.in).nextLine();
            channel.basicPublish("","task_queue", MessageProperties.PERSISTENT_BASIC,s.getBytes());
        }
    }
}

重启rabbit测试

相关文章