欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringCloud学习笔记-SpringCloud Bus、SpringCloudStream

程序员文章站 2022-07-13 08:36:08
...

springcloudConfig 手动刷新的问题:
假如有多个客户端3344、3366、3377
每个微服务都要执行一次post请求,手动刷新
可否广播,一次通知,处处生效?

分布式自动刷新配置功能
SpringCloud Bus 配合 SpringCloud Config可以实现配置的动态刷新

1、SpringCloud Bus 消息总线

1.1 基本概念

Bus支持两种消息代理:RabbitMQ 和 Kafka

SpringCloud Bus 是用来将分布式系统的节点与轻量级消息系统连接起来的框架,它整合了Java的时间处理机制和消息中间件的功能。

SpringCloud Bus能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改,时间推送等,也可以当做微服务之间的通信通道
SpringCloud学习笔记-SpringCloud Bus、SpringCloudStream

什么是总线:
在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个消息主题,并让系统中的所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称它为消息总线。 在总线上的各个实例,都可以方便的广播一些需要让其他连接载该主题上的实例都知道的消息。

基本原理
ConfigClient 实例都监听MQ 中的同一个Topic默认是(SpringCloud Bus)他会把这个信息放入Topic中,这样其他监听同一个Topic的服务就能得到通知,然后取更新自身的配置。

1.2 RabbitMQ环境配置

  1. 安装Erlang
  2. 安装RabbitMQ
  3. 进入RabbitMQ下的bin目录
  4. 输入一下命令启动安装功能 rabbit-plugins enable rabbitmq_management
  5. 可视化插件
  6. http://localhost:15672/
  7. 默认账号密码 guest guest

1.3 Bus动态刷新全局广播通知

制作3366
cloud-config-client-3366
3355也添加这个依赖

  <!--添加消息总线rabbitMQ支持-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

bootstrap.yml

server:
  port: 3366

spring:
  application:
    name: config-client
  cloud:
    #Config客户端配置
    config:
      label: master #分支名称
      name: config #配置文件名称
      profile: dev #读取后缀名称 上述3个综合:master分支上config-dev.yml的配置文件被读取 http://config-3344.com:3344/master/config-dev.yml
      uri: http://localhost:3344 #配置中心地址
  #rabbit相关配置 15672是web管理界面的端口,5672是MQ访问的端口
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
#服务注册到eureka地址
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka

#暴露监控端点
management:
  endpoints:
    web:
      exposure:
        include: "*"

主启动类

@SpringBootApplication
@EnableEurekaClient
public class ConfigClientMain3366 {
    public static void main(String[] args) {
        SpringApplication.run(ConfigClientMain3366.class,args);
    }
}

ConfigClientController

@RestController
@RefreshScope
public class ConfigClientController {
    @Value("${server.port}")
    private String serverPort;
    @Value(("${config.info}"))
    private String configInfo;

    @GetMapping("/configInfo")
    public String getConfigInfo(){
        return "serverPort: "+serverPort+"\t\n\n configInfo: "+configInfo;
    }
}
  1. 利用消息总线触发一个客户端/bus/refresh,而刷新所有客户端配置
    1. 破坏了微服务职责的单一性,因为微服务本身就是业务模块,它本不应该承担刷新的职责
    2. 破坏了微服务节点的对等性
    3. 有一定局限性,例如,微服务在迁移的时候,它的网络地址常常发生变化,此时如果想要做到自动刷新,那就会增加更多的修改
  2. 利用消息总线触发一个服务端ConfigServer的bus/refresh端点,而刷新所有客户端配置
    SpringCloud学习笔记-SpringCloud Bus、SpringCloudStream

SpringCloud学习笔记-SpringCloud Bus、SpringCloudStream

1.3.1 3344 3355 3366 添加消息总线

 <!--添加消息总线RbbitMQ支持-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

yml

#rabbitmq相关配置,暴露bus刷新配置的端点
management:
  endpoints:  #暴露bus刷新配置的端点
    web:
      exposure:
        include: 'bus-refresh'  #凡是暴露监控、刷新的都要有actuator依赖,bus-refresh就是actuator的刷新操作

3355

 <!--添加消息总线rabbitMQ支持-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

yml

#rabbit相关配置 15672是web管理界面的端口,5672是MQ访问的端口
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

3366

 <!--添加消息总线rabbitMQ支持-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
#rabbit相关配置 15672是web管理界面的端口,5672是MQ访问的端口
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

1.3.2 测试

一次修改,广播通知,出处生效

  1. 在github上修改配置文件
  2. 发送post请求 cur -X POST "http://localhost:3344/actuator/bus-refresh" (向3344发post请求)
  3. 刷新3355、3366 localhost3366/configInfo localhost3355/configInfo

1.4 定点通知

只通知3355 3366
指定某一个具体实例生效
公式 :

http://localhost:配置中心的端口号/actuator/bus-refresh/{destination}

/bus/refresh请求不再发送到具体的服务实例上。

cur -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"

2、SpringCloud Stream 消息驱动

2.1 基本概念

解决的痛点:(MQ消息中间件)

  1. ActiveMQ
  2. RabbitMQ
  3. RocketMQ
  4. kafka

系统可能会同时存在两种RabbitMQ 和 kafka

举例:对于我们Java程序员来说,可能有时要使用ActiveMQ,有时要使用RabbitMQ,甚至还有RocketMQ以及Kafka,这之间的切换似乎很麻烦,我们很难,也没有太多时间去精通每一门技术,那有没有一种新技术的诞生,让我们不再关注具体MQ的细节,自动的给我们在各种MQ内切换。

有没有一种新的结束诞生,让我们不再关注具体的MQ的细节,我们只需要用一种适配绑定的方式,在各种MQ之间切换

cloud stream : 屏蔽底层消息中间件的差异,统一消息的编程模型

SpringCloud Stream是一个构建消息驱动微服务的框架

应用程序通过inputs 或者outputs 来与SpringCloud Stream 中的 binder对象交互
通过我们来binding,而Springcloud Stream的binder对象负责与消息中间件交互
所以我们只需要搞清楚如何与Springcloud Stream交互就可以党鞭的使用消息驱动的方式

通过spring Integeration来连接消息代理中间件以实现消息时间驱动
Springcloud Stream为一些供应商的消息中间件产品提供了葛新华的自动化配置实现,引用了发布订阅、消费组、分区三个概念

目前仅支持RabbitMQ kafka

2. 2 设计思想

标准MQ :
SpringCloud学习笔记-SpringCloud Bus、SpringCloudStream

  1. 生产者和消费者之间通过消息媒介传递信息内容
  2. 消息必须走特定的通道

为什么要用SpringCloud ?
比方说我们用到了RabbitMQ和Kafka,由于这两公分消息中间件的架构上的不同
像RabbitMQ有exchange,kafka有Topic 和 Partitions分区
SpringCloud学习笔记-SpringCloud Bus、SpringCloudStream
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,他们的实现细节上会有较大的差异性
通过定义绑定器作为中间层,完美的实现了应用程序与消息中间细节之间的隔离
通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件的差异

通过定义绑定器Binder作为中间层,实现了应用程序和消息中间件的细节之间的隔离。

Binder :
input 消费者
output 生产者

SpringCloud学习笔记-SpringCloud Bus、SpringCloudStream

Stream的通信方式遵循了发布订阅模式 Topic 主题进行分区广播

  1. 在RabbitMQ就是Exchange
  2. 在kafka中就是Topic

2.3 编码API和常用注解

SpringCloud学习笔记-SpringCloud Bus、SpringCloudStream
Binder : 很方便的连接中间件,屏蔽差异
Channel: 通道是队列Queue的一种抽象,在消息通讯系统中实现存储和转发的媒介,通过channel 对队列进行配置
Source 和 Sink: 消息的生产者和消费者

SpringCloud学习笔记-SpringCloud Bus、SpringCloudStream

2.4 生产者

cloud-stream-rabbitmq-provider8801

1.pom

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2.appliaction.yml

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitMQ的服务信息
      defaultRabbit: # 表示定义的名称,用于binding的整合
        type: rabbit # 消息中间件类型
        environment: # 设置rabbitMQ的相关环境配置
          spring:
            rabbitmq:
              host: localhost
              port: 5672
              username: guest
              password: guest
      bindings: # 服务的整合处理
        ouput: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
    lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
    instance-id: send-8801.com # 主机名
    prefer-ip-address: true # 显示ip

3.service

//这不是传统的service,这是和rabbitmq打交道的,不需要加注解@Service
//这里不掉dao,掉消息中间件的service
//信道channel和exchange绑定在一起
//定义消息的推送管道
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {

    /**
     * 消息发送管道
     */
    @Resource
    private MessageChannel output;

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("serial = " + serial);
        return null;
    }
}

controller

@RestController
public class SendMessageController {

    @Resource
    private IMessageProvider messageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}

2.5 消费者

cloud-stream-rabbitmq-consumer8802

 <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

yml :
input: # 这个名字是一个通道的名称
instance-id: receive-8802.com #主机名

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitMQ的服务信息
        defaultRabbit: # 表示定义的名称,用于binding的整合
          type: rabbit # 消息中间件类型
          environment: # 设置rabbitMQ的相关环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          #group: atguiguA
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
    lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
    instance-id: receive-8802.com #主机名
    prefer-ip-address: true # 显示ip

主启动类 :


@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}

controller :@EnableBinding(Sink.class)

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1号,------->接收到的消息: "+message.getPayload()+"\t port: "+serverPort);
    }
}

2.6 分组消费与持久化

7001 注册中心
8801 服务生产
8802 服务消费
8803 服务消费

cloud-stream-rabbitmq-consumer8803 同 8802

2.6.1 重复消费

目前是8802/8803都收到了消息,出现了重复消费,怎么解决?
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ获取订单信息,那如果一个订单通同时被两个服务获取到,那么就会造成数据错误我们得避免这种情况,这是我们就可以使用Stream中的消息分组来解决。
SpringCloud学习笔记-SpringCloud Bus、SpringCloudStream

注意在stream中处于同一个group中的多个消费者之间是竞争关系,就能够保证消息只会被其中一个应用消费一次,不同组是可以全面消费的(重复消费)

2.6.2 分组

故障现象:重复消费
导致原因: 默认分组group是不同过得,组流水号不一样,被认为不同组,可以重复消费

微服务应用放置在同一个group中,就能保证消息只会被其中一个应用消费一次
不同组是可以同时消费的,同一个组内会发生竞争关系,只有其中一个可以消费

自定义分组 :

8802 : group: atguiguA

bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: atguiguA

8803 : group: atguiguB

放在同一个分组,就不会出现重复消费
8802 : group: atguiguA
8803 : group: atguiguA

2.6.3 持久化

去掉8802的分组,8803的分组并没有去除掉
由于8802去掉了分组,8802从头到尾都没有去8801上取消息
8803 由于配置了分组,重启后8803会消费rabbitmq未曾消费过的消息

3、SpringCloud Sleuth 分布式请求链路追踪

问题:
在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的服务节点调用来协同产生最后的请求结果,每一个前段请求都会行程一条复杂的分布式服务调用链路,链路中的任何一环出现高演示或错误都会引起整个请求最后的失败。

相关标签: springcloud