在实际的企业开发中,消息中间件是至关重要的组件之一。
消息中间件主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。不同的中间件其实现方式,内部结构是不一样的。如常见的RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream 给我们提供了一种解耦合的方式。
Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。
通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费
绑定器
Binder 绑定器是Spring Cloud Stream中一个非常重要的概念。
在没有绑定器这个概念的情况下,
我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,
这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,
当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。
通过定义绑定器作为中间层,实现了应用程序与消息中间件(Middleware)细节之间的隔离。通过向应用程序暴露统一的Channel通过,使得应用程序不需要再考虑各种不同的消息中间件的实现。
当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。
甚至可以任意的改变中间件的类型而不需要修改一行代码。
Spring Cloud Stream支持各种binder实现
通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binder 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。
发布/订阅模型
在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,
当一条消息被投递到消息中间件之后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。
这里所提到的 Topic 主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。
在不同的消息中间件中, Topic 可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic。
案例中通过rabbitMQ作为消息中间件,完成SpringCloud Stream的案例。需要自行安装
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
server:
port: 7001 #服务端口
spring:
application:
name: stream_producer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
output:
destination: itbluebox-default #指定消息发送的目的地,在rabbitmq当中,发送一个itbluebox-default的exchange
binders: #配置绑定器
defaultRabbit:
type: rabbit
package cn.itbluebox.stream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
/* 入门案例 1、引入依赖 2、配置application.yml文件 3、发送消息的话,定义一个通道的接口,通道接口当中内置的 MessageChannel 在SpringCloudStream当中内置了接口:Source 4、@enableBinding: 绑定对应的通道 5、发送消息的话,通过 MessageChannel 发送消息 * 如果需要messageChannel --> 通过绑定的内置接口获取 */
@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication implements CommandLineRunner {
@Autowired
private MessageChannel output;
@Override
public void run(String... args) throws Exception {
//发送消息
//MessageBuilder : 工具类,创建消息
output.send(MessageBuilder.withPayload("hello 蓝盒子").build());
}
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class,args);
}
}
访问:RabbitMQ http://localhost:15672/#/exchanges
创建application.yml
server:
port: 7002 #服务端口
spring:
application:
name: rabbitmq_consumer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input: #内置的获取消息的通道,从itbluebox-default当中获取消息
destination: itbluebox-default
binders:
defaultRabbit:
type: rabbit
/* * 1、引入依赖 * 2、配置Application.yml * 3、需要配置一个通道的接口 * 内置获取消息的通道接口 sink * 4、绑定通道 * 5、配置一个监听方法 : 当程序从中间件获取数据之后,执行的业务逻辑的方法 * 需要在监听方法上配置@StreamListener */
@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {
//监听binding中的消息
@StreamListener(Sink.INPUT)
public void input(String message){
System.out.println("获取消息:"+message);
}
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
运行测试
清空一下控制台
我们重新运行消息生产者
观察消息消费者
创建对应的工具类
/* 负责向中间件发送数据 */
@Component
@EnableBinding(Source.class)
public class MessageSender {
@Autowired
private MessageChannel output;
//发送消息
public void send(Object obj){
output.send(MessageBuilder.withPayload(obj).build());
}
}
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class,args);
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {
@Autowired
private MessageSender messageSender;
@Test
public void testSend(){
messageSender.send("hello 工具类");
}
}
@Component
@EnableBinding(Sink.class)
public class MessageListener {
//监听binding中的消息
@StreamListener(Sink.INPUT)
public void input(String message){
System.out.println("获取消息:"+message);
}
}
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
启动stream_consumer
启动成功邮件清空一下控制台
消息的生产者stream_producer执行一下单元测试
运行成功
观察ConsumerApplication控制台
Spring Cloud Stream 内置了两种接口,
分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,
而在我们实际使用中,往往是需要定义各种输入输出流。
使用方法也很简单。
/** * 自定义的消息通道 */
public interface MyProcessor {
/** * 消息生产者的配置 */
String MYOUTPUT = "myoutput";
@Output("myoutput")
MessageChannel myoutput();
/** * 消息消费者的配置 */
String MYINPUT = "myinput";
@Input("myinput")
SubscribableChannel myinput();
}
/* 负责向中间件发送数据 */
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {
@Autowired
@Qualifier(value = "myoutput")
private MessageChannel myoutput;
//发送消息
public void send(Object obj){
myoutput.send(MessageBuilder.withPayload(obj).build());
}
}
myoutput:
destination: itbluebox-custom-default
/** * 自定义的消息通道 */
public interface MyProcessor {
/** * 消息生产者的配置 */
String MYOUTPUT = "myoutput";
@Output("myoutput")
MessageChannel myoutput();
/** * 消息消费者的配置 */
String MYINPUT = "myinput";
@Input("myinput")
SubscribableChannel myinput();
}
@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {
//监听binding中的消息
@StreamListener(MyProcessor.MYINPUT)
public void input(String message){
System.out.println("获取消息:"+message);
}
}
myinput:
destination: itbluebox-custom-default
重新启动消费者
运行成功并清空控制台
运行消息生产者
运行成功
观察ConsumerApplication
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
修改一下启动类的名称
server:
port: 7003 #服务端口
spring:
application:
name: rabbitmq-consumer2 #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input: #内置的获取消息的通道 , 从itcast-default中获取消息
destination: itbluebox-default
myinput:
destination: itbluebox-custom-default
binders:
defaultRabbit:
type: rabbit
运行stream_consumer2
情况两个项目的控制台
运行消息生产者
ConsumerApplication接收到了消息
ConsumerApplication2也接收到了消息
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。
默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
实现的方式非常简单,我们只需要在服务消费者端设置
spring.cloud.stream.bindings.input.group 属性即可,比如我们可以这样实现:
group: group1
group: group1 #设置消息的组名称(同名组中的多个消费者,只会有一个消费消息)
重新启动运行测试
运行消息生产者
ConsumerApplication当中接收到了消息
ConsumerApplication2当中没有接收到消息
有一些场景需要满足, 同一个特征的数据被同一个实例消费, 比如同一个id的传感器监测数据必须被同一个实例统计计算分析, 否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求至同一实例。
简单的来说就是生产者生产的不同的消息被不同的消费者消费(区分不同的消息进行消费,对消息进行归类划分)
producer:
partition-key-expression: payload #分区的关键字
partition-count: 2 #分区大小
server:
port: 7002 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
instanceCount: 2 #代表是消费者总数
instanceIndex: 0 #当前消费者的索引
bindings:
input: #内置的获取消息的通道 , 从itcast-default中获取消息
destination: itbluebox-default
myinput:
destination: itbluebox-custom-default
group: group1
consumer:
partitioned: true #开启分区支持
binders:
defaultRabbit:
type: rabbit
server:
port: 7003 #服务端口
spring:
application:
name: rabbitmq-consumer2 #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
instanceCount: 2 #代表是消费者总数
instanceIndex: 1 #当前消费者的索引
bindings:
input: #内置的获取消息的通道 , 从itcast-default中获取消息
destination: itbluebox-default
myinput:
destination: itbluebox-custom-default
group: group1
consumer:
partitioned: true #开启分区支持
binders:
defaultRabbit:
type: rabbit
从上面的配置中,我们可以看到增加了这两个参数:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {
@Autowired
private MessageSender messageSender;
@Test
public void testSend(){
for(int i = 0;i < 5;i++){
messageSender.send("1");
}
}
}
同时清空一下两个消费者的控制台,方便查看信息
运行生产者
观察消费者,我没看到在ConsumerApplication2当中得到了消息
修改测试代码设置发送0
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {
@Autowired
private MessageSender messageSender;
@Test
public void testSend(){
for(int i = 0;i < 5;i++){
messageSender.send("0");
}
}
}
运行测试
在ConsumerApplication 当中得到了消息
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_44757034/article/details/121873346
内容来源于网络,如有侵权,请联系作者删除!