消息队列、消息服务器、消息中间件Borker
常见消息服务器:Rabbitmq Activemq Rocketmq(阿里的) Kafka Tubemq(腾讯)
centos-8-2105/centos-7-1908
./ip-dhcp 自动获取ip
# centos 8 开启 VMware 托管
nmcli n on systemctl restart NetworkManager
# centos 7 禁用 NetworkManager 系统服务
systemctl stop NetworkManager systemctl disable NetworkManager
#如果网络还有问题,可以充值vmware虚拟网络
编辑--虚拟网络篇编辑器--还原默认设置
还原默认设置会删除所有虚拟网络,重新创建,重新初始化
默认分配的ip
静态设置ip
ip:192.168.64.3
注意:先打开虚拟机
/DevOPs/Docker/docker-install
# 进入 docker-install 文件夹
cd docker-install
# 为 docker-install 添加执行权限
chmod +x install.sh
# 安装
./install.sh -f docker-20.10.6.tgz
一般执行shutdown
./ip-static
ip:192.168.64.140
ifconfig
/docker/rabbit-image.gz
docker load -i rabbit-image.gz
关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
# 重启 docker 系统服务
systemctl restart docker
配置管理员用户名和密码
mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf
# 添加两行配置:
default_user = admin
default_pass = admin
启动Rabbitmq
docker run -d --name rabbit \
-p 5672:5672 \
-p 15672:15672 \
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
--restart=always \
rabbitmq:management
访问网址http://192.168.64.140:15672
用户名/密码:admin
MQ Message Queue
数据吞吐量很大
数据排队
feign是同步调用,效率低
mq异步调用
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
</dependencies>
只有一个消费者
生产者代码
package m1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*简单模式*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);//收发消息5672
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();//创建连接
Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
//2.在服务器上创建 helloworld队列
//队列如果已经存在,不会重复创建
channel.queueDeclare("helloworld",false,false,false,null);
//3.向helloworld发送消息
channel.basicPublish("", "helloworld",null,"Hello World".getBytes());
}
}
channel.queueDeclare()参数说明
消费者代码
package m1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*简单模式--消费者*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);//收发消息5672
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();//创建连接
Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
//2.在服务器上创建 helloworld队列
//队列如果已经存在,不会重复创建
channel.queueDeclare("helloworld",false,false,false,null);
//创建回调对象
//处理消息的回调函数
DeliverCallback deliverCallback1=(consumerTag,message)->{
byte[] a=message.getBody();
String s=new String(a);
System.out.println("收到"+s);
};
DeliverCallback deliverCallback=new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] a=message.getBody();
String s=new String(a);
System.out.println("收到"+s);
}
};
//取消消息的回调对象
CancelCallback cancelCallback=new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//从helloworld接收消息,把消息传递到回调对象处理
// channel.basicConsume("helloworld",true,处理消息的回调对象,取消消息处理回调对象); channel.basicConsume("helloworld",true,deliverCallback1,cancelCallback);
}
}
channel.basicConsume参数说明
第二个参数:确认方式 ACK --Acknowledgment
多个消费者
生产者代码
package m1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*简单模式--消费者*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);//收发消息5672
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();//创建连接
Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
//2.在服务器上创建 helloworld队列
//队列如果已经存在,不会重复创建
channel.queueDeclare("helloworld",false,false,false,null);
//创建回调对象
//处理消息的回调函数
DeliverCallback deliverCallback1=(consumerTag,message)->{
byte[] a=message.getBody();
String s=new String(a);
System.out.println("收到"+s);
};
DeliverCallback deliverCallback=new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] a=message.getBody();
String s=new String(a);
System.out.println("收到"+s);
}
};
//取消消息的回调对象
CancelCallback cancelCallback=new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//从helloworld接收消息,把消息传递到回调对象处理
// channel.basicConsume("helloworld",true,处理消息的回调对象,取消消息处理回调对象);
/* * 第二个参数:确认方式 ACK --Acknowledgment * -true 自动确认 * -false 手动确认(手动发送回执) 保证正确处理消息 * */
channel.basicConsume("helloworld",true,deliverCallback1,cancelCallback);
}
}
消费者代码
业务:模拟耗时消息
package m2;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*工作模式--消费者*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);//收发消息5672
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();//创建连接
Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
//2.在服务器上创建 helloworld队列
//队列如果已经存在,不会重复创建
channel.queueDeclare("helloworld",false,false,false,null);
//回调对象
DeliverCallback deliverCallback=(consumerTag,message)->{
String s=new String(message.getBody());
System.out.println(s);
//遍历所有字符,遇到.暂停1s
for (int i = 0; i < s.length(); i++) {
if(s.charAt(i)=='.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("消息处理完成");
};
CancelCallback cancelCallback=(consumerTag)->{};
//接收信息
channel.basicConsume("helloworld",true,deliverCallback,cancelCallback);
}
}
修改Consumer代码
package m2;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*工作模式--消费者*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);//收发消息5672
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();//创建连接
Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
//2.在服务器上创建 helloworld队列
//队列如果已经存在,不会重复创建
channel.queueDeclare("helloworld",false,false,false,null);
//回调对象
DeliverCallback deliverCallback=(consumerTag,message)->{
String s=new String(message.getBody());
System.out.println(s);
//遍历所有字符,遇到.暂停1s
for (int i = 0; i < s.length(); i++) {
if(s.charAt(i)=='.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// channel.basicAck(回执,是否同时确认收到过的所有消息);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
System.out.println("消息处理完成");
};
CancelCallback cancelCallback=(consumerTag)->{};
//每次只收一条,处理完之前不收下一条
channel.basicQos(1);
//接收信息
//第二个参数为false手动确认
channel.basicConsume("helloworld",false,deliverCallback,cancelCallback);
}
}
重启消费者模块测试
修改生产者代码
package m2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/*工作模式--生产者*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);//收发消息5672
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();//创建连接
Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
//2.在服务器上创建 helloworld队列
//队列如果已经存在,不会重复创建
channel.queueDeclare("task_queue",true,false,false,null);
//循环输入消息发送
while (true){
System.out.println("输入消息:");
String s=new Scanner(System.in).nextLine();
channel.basicPublish("","task_queue",null,s.getBytes());
}
}
}
修改消费者代码
package m2;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*工作模式--消费者*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);//收发消息5672
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();//创建连接
Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
//2.在服务器上创建 helloworld队列
//队列如果已经存在,不会重复创建
channel.queueDeclare("task_queue",true,false,false,null);
//回调对象
DeliverCallback deliverCallback=(consumerTag,message)->{
String s=new String(message.getBody());
System.out.println(s);
//遍历所有字符,遇到.暂停1s
for (int i = 0; i < s.length(); i++) {
if(s.charAt(i)=='.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// channel.basicAck(回执,是否同时确认收到过的所有消息);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
System.out.println("消息处理完成");
};
CancelCallback cancelCallback=(consumerTag)->{};
//每次只收一条,处理完之前不收下一条
channel.basicQos(1);
//接收信息
//第二个参数为false手动确认
channel.basicConsume("task_queue",false,deliverCallback,cancelCallback);
}
}
package m2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/*工作模式--生产者*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);//收发消息5672
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();//创建连接
Channel channel = con.createChannel();//通信的通道 与服务器通信使用channel
//2.在服务器上创建 helloworld队列
//队列如果已经存在,不会重复创建
channel.queueDeclare("task_queue",true,false,false,null);
//循环输入消息发送
while (true){
System.out.println("输入消息:");
String s=new Scanner(System.in).nextLine();
channel.basicPublish("","task_queue", MessageProperties.PERSISTENT_BASIC,s.getBytes());
}
}
}
重启rabbit测试
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_55740233/article/details/121415357
内容来源于网络,如有侵权,请联系作者删除!