Java之 Spring Cloud 微服务的 Spring Cloud Stream(第四个阶段)【一】【SpringBoot项目实现商品服务器端调用】

x33g5p2x  于2021-12-30 转载在 Java  
字(12.4k)|赞(0)|评价(0)|浏览(317)
SpringCloud学习目录点击跳转对应的文章
Java之 Spring Cloud 微服务搭建(第一个阶段)【一】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务 Eureka (第一个阶段)【二】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建Ribbon(第一个阶段)【三】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建 Consul(第一个阶段)【四】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建 Feign组件(第二个阶段)【一】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建 Hystrix (第二个阶段)【二】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建Sentinel (第二个阶段)【三】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建网关 nginx,Zuul(第三个阶段)【一】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建网关SpringCloud Gateway微服务网关GateWay(第三个阶段)【二】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务的链路追踪 Sleuth 和 Zipkin(第三个阶段)【三】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务的 Spring Cloud Stream(第四个阶段)【一】【SpringBoot项目实现商品服务器端调用】
Java之 Spring Cloud 微服务的 SpringCloud Config 配置中心(第四个阶段)【二】【SpringBoot项目实现商品服务器端调用】
Java之 Spring Cloud 微服务的开源配置中心Apollo(第四个阶段)【三】【SpringBoot项目实现商品服务器端调用】

在实际的企业开发中,消息中间件是至关重要的组件之一。

消息中间件主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。不同的中间件其实现方式,内部结构是不一样的。如常见的RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream 给我们提供了一种解耦合的方式。

一、概述

Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。
通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。

说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费

1、核心概念

绑定器

Binder 绑定器是Spring Cloud Stream中一个非常重要的概念。

在没有绑定器这个概念的情况下,
我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,

这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,
当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。

通过定义绑定器作为中间层,实现了应用程序与消息中间件(Middleware)细节之间的隔离。通过向应用程序暴露统一的Channel通过,使得应用程序不需要再考虑各种不同的消息中间件的实现。

当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。

甚至可以任意的改变中间件的类型而不需要修改一行代码。

Spring Cloud Stream支持各种binder实现

通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binder 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。

发布/订阅模型
在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,
当一条消息被投递到消息中间件之后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。

这里所提到的 Topic 主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。

在不同的消息中间件中, Topic 可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic。

二、入门案例

1、 准备工作

案例中通过rabbitMQ作为消息中间件,完成SpringCloud Stream的案例。需要自行安装

2、工程搭建

(1)创建工程并引入坐标

  • 创建stream_producer

  • 创建stream_consumer

(2)引入依赖

  • stream_producer当中

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
    </dependencies>
  • stream_consumer当中

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
    </dependencies>

3、消息生产者

(1)设置 stream_producer当中设置配置文件

server:
  port: 7001 #服务端口
spring:
  application:
    name: stream_producer #指定服务名
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output:
          destination: itbluebox-default  #指定消息发送的目的地,在rabbitmq当中,发送一个itbluebox-default的exchange
      binders: #配置绑定器
        defaultRabbit:
          type: rabbit

(2)stream_producer当中编写启动类

package cn.itbluebox.stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
/* 入门案例 1、引入依赖 2、配置application.yml文件 3、发送消息的话,定义一个通道的接口,通道接口当中内置的 MessageChannel 在SpringCloudStream当中内置了接口:Source 4、@enableBinding: 绑定对应的通道 5、发送消息的话,通过 MessageChannel 发送消息 * 如果需要messageChannel --> 通过绑定的内置接口获取 */
@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication implements CommandLineRunner {
    @Autowired
    private MessageChannel output;
    @Override
    public void run(String... args) throws Exception {
        //发送消息
        //MessageBuilder : 工具类,创建消息
        output.send(MessageBuilder.withPayload("hello 蓝盒子").build());
    }
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class,args);
    }
}

(3)运行测试

访问:RabbitMQ http://localhost:15672/#/exchanges

4、 消息消费者

(1)设置消费者配置文件,stream_consumer

创建application.yml

server:
  port: 7002 #服务端口
spring:
  application:
    name: rabbitmq_consumer #指定服务名
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #内置的获取消息的通道,从itbluebox-default当中获取消息
          destination: itbluebox-default
      binders:
        defaultRabbit:
          type: rabbit

(2)编写测试类

/* * 1、引入依赖 * 2、配置Application.yml * 3、需要配置一个通道的接口 * 内置获取消息的通道接口 sink * 4、绑定通道 * 5、配置一个监听方法 : 当程序从中间件获取数据之后,执行的业务逻辑的方法 * 需要在监听方法上配置@StreamListener */
@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {
    //监听binding中的消息
    @StreamListener(Sink.INPUT)
    public void input(String message){
        System.out.println("获取消息:"+message);
    }
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class,args);
    }

}

运行测试

清空一下控制台

我们重新运行消息生产者

观察消息消费者

5、 对上述的代码进行优化

(1)消息生产者代码优化

创建对应的工具类

  • 创建MessageSender

/* 负责向中间件发送数据 */
@Component
@EnableBinding(Source.class)
public class MessageSender {
    @Autowired
    private MessageChannel output;
    //发送消息
    public void send(Object obj){
        output.send(MessageBuilder.withPayload(obj).build());
    }
}
  • 修改启动类

@SpringBootApplication
public class ProducerApplication  {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class,args);
    }
}
  • 编写测试类

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {
    @Autowired
    private MessageSender messageSender;
    @Test
    public void testSend(){
        messageSender.send("hello 工具类");
    }
}

(2)消息消费者代码优化

  • 创建MessageListener

@Component
@EnableBinding(Sink.class)
public class MessageListener {

    //监听binding中的消息
    @StreamListener(Sink.INPUT)
    public void input(String message){
        System.out.println("获取消息:"+message);
    }

}
  • 修改启动类

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class,args);
    }
}

(3)启动测试

启动stream_consumer

启动成功邮件清空一下控制台

消息的生产者stream_producer执行一下单元测试

运行成功

观察ConsumerApplication控制台

三、自定义消息通道

Spring Cloud Stream 内置了两种接口,
分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,
而在我们实际使用中,往往是需要定义各种输入输出流。

使用方法也很简单。

1、设置消息的生产者

(1)创建MyProcessor

/** * 自定义的消息通道 */
public interface MyProcessor {

	/** * 消息生产者的配置 */
	String MYOUTPUT = "myoutput";

	@Output("myoutput")
	MessageChannel myoutput();

	/** * 消息消费者的配置 */
	String MYINPUT = "myinput";

	@Input("myinput")
	SubscribableChannel myinput();
}

(2)创建修改MessageSender

/* 负责向中间件发送数据 */
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {
    @Autowired
    @Qualifier(value = "myoutput")
    private MessageChannel myoutput;
    //发送消息
    public void send(Object obj){
        myoutput.send(MessageBuilder.withPayload(obj).build());
    }
}

(3)设置配置文件

myoutput:
          destination: itbluebox-custom-default

2、设置消息的生产者

(1)创建MyProcessor

/** * 自定义的消息通道 */
public interface MyProcessor {

	/** * 消息生产者的配置 */
	String MYOUTPUT = "myoutput";

	@Output("myoutput")
	MessageChannel myoutput();

	/** * 消息消费者的配置 */
	String MYINPUT = "myinput";

	@Input("myinput")
	SubscribableChannel myinput();
}

(2)创建MyProcessor

@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {

    //监听binding中的消息
    @StreamListener(MyProcessor.MYINPUT)
    public void input(String message){
        System.out.println("获取消息:"+message);
    }

}

(3)设置application.yml配置文件

myinput:
          destination: itbluebox-custom-default

3、运行测试

重新启动消费者

运行成功并清空控制台

运行消息生产者

运行成功

观察ConsumerApplication

四、 消息分组

1、工程准备

  • 创建第二个消费者

  • 添加依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
    </dependencies>
  • 创建对应需要的启动类和其类,这里和之前的一样之前复制stream_consumer工程当中的内容即可

修改一下启动类的名称

  • 设置配置文件,设置其端口号7003

server:
  port: 7003 #服务端口
spring:
  application:
    name: rabbitmq-consumer2 #指定服务名
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #内置的获取消息的通道 , 从itcast-default中获取消息
          destination: itbluebox-default
        myinput:
          destination: itbluebox-custom-default
      binders:
        defaultRabbit:
          type: rabbit
  • 查看运行效果
    运行stream_consumer

运行stream_consumer2

情况两个项目的控制台

运行消息生产者

ConsumerApplication接收到了消息

ConsumerApplication2也接收到了消息

2、消息分组介绍

通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。

默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。

实现的方式非常简单,我们只需要在服务消费者端设置
spring.cloud.stream.bindings.input.group 属性即可,比如我们可以这样实现:

(1)设置配置文件

  • 在stream_consumer当中的配置文件

group: group1
  • 在stream_consumer_2当中配置

group: group1 #设置消息的组名称(同名组中的多个消费者,只会有一个消费消息)

(2)运行测试

重新启动运行测试

运行消息生产者

ConsumerApplication当中接收到了消息

ConsumerApplication2当中没有接收到消息

五、 消息分区

有一些场景需要满足, 同一个特征的数据被同一个实例消费, 比如同一个id的传感器监测数据必须被同一个实例统计计算分析, 否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求至同一实例。

简单的来说就是生产者生产的不同的消息被不同的消费者消费(区分不同的消息进行消费,对消息进行归类划分)

1、消息生产者配置

producer:
            partition-key-expression: payload  #分区的关键字
            partition-count: 2  #分区大小

2、消息消费者配置

  • stream_consumer

server:
  port: 7002 #服务端口
spring:
  application:
    name: rabbitmq-consumer #指定服务名
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      instanceCount: 2  #代表是消费者总数
      instanceIndex: 0  #当前消费者的索引
      bindings:
        input: #内置的获取消息的通道 , 从itcast-default中获取消息
          destination: itbluebox-default
        myinput:
          destination: itbluebox-custom-default
          group: group1
          consumer:
            partitioned: true  #开启分区支持
      binders:
        defaultRabbit:
          type: rabbit
  • 在stream_consumer_2当中设置信息

server:
  port: 7003 #服务端口
spring:
  application:
    name: rabbitmq-consumer2 #指定服务名
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      instanceCount: 2  #代表是消费者总数
      instanceIndex: 1  #当前消费者的索引
      bindings:
        input: #内置的获取消息的通道 , 从itcast-default中获取消息
          destination: itbluebox-default
        myinput:
          destination: itbluebox-custom-default
          group: group1
          consumer:
            partitioned: true  #开启分区支持
      binders:
        defaultRabbit:
          type: rabbit

从上面的配置中,我们可以看到增加了这两个参数:

  1. pring.cloud.stream.bindings.output.producer.partitionKeyExpression :
    通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
  2. spring.cloud.stream.bindings.output.producer.partitionCount :该参数指定了消息分
    区的数量。
    到这里消息分区配置就完成了,我们可以再次启动这两个应用,同时消费者启动多个,
    但需要注意的是要为消费者指定不同的实例索引号,
    这样当同一个消息被发给消费组时,
    我们可以发现只有一个消费实例在接收和处理这些相同的消息。

3、运行测试

  • 修改stream_producer测试类

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {
    @Autowired
    private MessageSender messageSender;
    @Test
    public void testSend(){
        for(int i = 0;i < 5;i++){
            messageSender.send("1");
        }
    }
}
  • 重新启动消费者一和消费者二

同时清空一下两个消费者的控制台,方便查看信息

运行生产者

观察消费者,我没看到在ConsumerApplication2当中得到了消息

修改测试代码设置发送0

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {
    @Autowired
    private MessageSender messageSender;
    @Test
    public void testSend(){
        for(int i = 0;i < 5;i++){
            messageSender.send("0");
        }
    }
}

运行测试

在ConsumerApplication 当中得到了消息

SpringCloud学习目录点击跳转对应的文章
Java之 Spring Cloud 微服务搭建(第一个阶段)【一】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务 Eureka (第一个阶段)【二】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建Ribbon(第一个阶段)【三】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建 Consul(第一个阶段)【四】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建 Feign组件(第二个阶段)【一】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建 Hystrix (第二个阶段)【二】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建Sentinel (第二个阶段)【三】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建网关 nginx,Zuul(第三个阶段)【一】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务搭建网关SpringCloud Gateway微服务网关GateWay(第三个阶段)【二】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务的链路追踪 Sleuth 和 Zipkin(第三个阶段)【三】【SpringBoot项目实现商品服务器端是调用】
Java之 Spring Cloud 微服务的 Spring Cloud Stream(第四个阶段)【一】【SpringBoot项目实现商品服务器端调用】
Java之 Spring Cloud 微服务的 SpringCloud Config 配置中心(第四个阶段)【二】【SpringBoot项目实现商品服务器端调用】
Java之 Spring Cloud 微服务的开源配置中心Apollo(第四个阶段)【三】【SpringBoot项目实现商品服务器端调用】

相关文章