返回 导航

SpringBoot / Cloud

hangge.com

消息总线Spring Cloud Bus使用详解1(使用RabbitMQ)

作者:hangge | 2020-10-29 08:10

一、整合 RabbitMQ 实现消息总线

1,消息总线介绍

  • 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来,由于该主题中产生的消息会被所有实例监听和消费所以我们称它为消息总线。
  • 在总线上的各个实例都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息,例如配置信息的变更或者其他一些管理操作等。
  • 消息总线在微服务架构系统中被广泛使用,所以它同配置中心一样,几乎是微服务架构中的必备组件。

2,Spring Cloud Bus 介绍

  • Spring Cloud 作为微服务架构综合性的解决方案,也有自己的消息总线实现方案:Spring Cloud Bus
  • 通过使用 Spring Cloud Bus,可以非常容易地搭建起消息总线,同时实现了一些消息总线中的常用功能。比如:配合 Spring Cloud Config 实现微服务应用配置信息的动态更新等。
  • 当前版本的 Spring Cloud Bus 仅支持两款消息中间件产品:RabbitMQKafka

3,准备工作

(1)本文介绍如何使用 RabbitMQ 实现消息总线,所以首次要安装好 RabbitMQ 环境,具体步骤可以参考我之前写的文章:

(2)本文演示配合 Spring Cloud Config 实现微服务应用配置信息的动态更新,所以需要准备好相关的几个工程:eureka 服务注册中心、配置中心 config-server、客户端 hangge-client 以及 Git 仓库中的配置文件。具体内容可以参考我之前写的文章:

4,开始整合

(1)首先修改客户端 hangge-client pom.xml 文件,增加 spring-cloud-starter-bus-amqp 模块。
注意spring-boot-starter-actuator 模块也是必须的,用来提供刷新端点。
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

(2)接着在配置文件中增加关于 RabbitMQ 的连接和用户信息。
spring.rabbitmq.host=192.168.60.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=hangge
spring.rabbitmq.password=123

5,运行测试

(1)启动 eureka 服务注册中心、配置中心 config-server、以及两个 hangge-client 客户端(分别在不同端口上,比如 70027003

(2)启动后 RabbitMQ 中会自动创建一个 topic 类型的 Exchange 和两个以 springCloudBus.anonymous. 开头的匿名 Queue

(3)分别访问两个客户端的 /test 接口,页面返回对应环境的 from 属性内容。当我们修改 Git 参考里的配置文件后,再次访问 /test 接口,会发现此时属性值并没有变化。

(4)这时我们发送 POST 请求到任意一个客户端实例的 /actuator/bus-refresh 接口上:
(1)这里我们通过向服务实例请求 Spring Cloud Bus/bus-refresh 接口,从而触发总线上其他所有服务实例的 /fresh。当在一些特殊场景下,我们希望可以刷新微服务中某个具体实例的配置,那么可以借助该接口的 destination 参数实现,比如:
  • 请求 /actuator/bus-refresh/hangge-client:7002 则只有实例名为 hangge-client:7002 的应用才会刷新配置。
  • 请求 /actuator/bus-refresh/hangge-client:** 则会触发 hangge-client 服务的所有实例进行刷新。
(2)而从 Git 仓库中配置的修改到发起 /actuator/bus-refreshPOST 请求这一步实际中可以同 Git 仓库的 Web Hook 来自动触发。
  • 由于所有连接到消息总线上的应用都会接收到更新请求,所以在 Web Hook 中就不需要维护所有检点内容来进行更新,解决原先每个实例都要通过 Web Hook 来逐个进行刷新的问题。

(5)然后再次访问两个客户端的 /test 接口,可发现两个请求都会返回最新的 from 属性。说明这两个连接到消息总线上的应用都收到了更新请求,并进行了配置更新。

(6)整个系统的架构图如下下所示:

附一:架构优化

1,问题描述

(1)在之前的架构中,服务的配置更新需要通过向具体服务中的某个实例发送请求,再触发对整个服务集群的配置更新。
(2)但仍然有个不足,我们指定的应用实例会不同于集群中的其他应用实例,这样会增加集群内部的复杂度,不利于将来的运维工作。
比如需要对服务实例进行迁移,那么我们不得不修改 Web Hook 中的配置等。所以要尽可能地让服务集群中的各个节点是对等的。

2,解决办法

(1)我们在 Config Server 中也引入 Spring Cloud Bus,参考前面客户端配置部分,将配置服务端 config-server 也加入到消息总线中来。
(2)这样以后 /actuator/bus-refresh 请求不在发送到具体服务实例上,而是发送给 Config Server(如果需要指定服务或者实例进行配置更新同样通过 destination 参数指定)

(2)修改后的架构如下。我们的服务实例不需要再承当触发配置更新的指责。同时,对于 Git 的触发等配置都只需要针对 Config Server 即可,从而简化了集群上的一些维护工作。

附二:RabbitMQ 配置

    Spring Cloud Bus 中的 RabbitMQ 整合使用了 Spring BootConnectionFactory,所以在 Spring Cloud Bus 中支持以 spring.rabbit.mq 为前缀的 Spring Boot 配置属性。具体的配置属性、说明以及默认值如下表所示:
属性名 说明 默认值
spring.rabbitmq.address 客户端连接的地址,有多个的时候使用逗号分隔,该地址可以是IP与Port的结合  
spring.rabbitmq.cache.channel.checkout-timeout 当缓存已满时,获取Channel的等待时间,单位为毫秒  
spring.rabbitmq.cache.channel.size 缓存中保持的Channel数量  
spring.rabbitmq.cache.connection.mode 连接缓存的模式 CHANNEL
spring.rabbitmq.cache.connection.size 缓存的连接数  
spring.rabbitmq.connnection-timeout 连接超时参数单位为毫秒:设置为“0”代表无穷大  
spring.rabbitmq.dynamic 默认创建一个AmqpAdmin的Bean true
spring.rabbitmq.host RabbitMQ的主机地址 localhost
spring.rabbitmq.listener.acknowledge-mode 容器的acknowledge模式  
spring.rabbitmq.listener.auto-startup 启动时自动启动容器 true
spring.rabbitmq.listener.concurrency 消费者的最小数量  
spring.rabbitmq.listener.default-requeue-rejected 投递失败时是否重新排队 true
spring.rabbitmq.listener.max-concurrency 消费者的最大数量  
spring.rabbitmq.listener.prefetch 在单个请求中处理的消息个数,他应该大于等于事务数量  
spring.rabbitmq.listener.retry.enabled 不论是不是重试的发布 false
spring.rabbitmq.listener.retry.initial-interval 第一次与第二次投递尝试的时间间隔 1000
spring.rabbitmq.listener.retry.max-attempts 尝试投递消息的最大数量 3
spring.rabbitmq.listener.retry.max-interval 两次尝试的最大时间间隔 10000
spring.rabbitmq.listener.retry.multiplier 上一次尝试时间间隔的乘数 1.0
spring.rabbitmq.listener.retry.stateless 不论重试是有状态的还是无状态的 true
spring.rabbitmq.listener.transaction-size 在一个事务中处理的消息数量。为了获得最佳效果,该值应设置为小于等于每个请求中处理的消息个数,即spring.rabbitmq.listener.prefetch的值  
spring.rabbitmq.password 登录到RabbitMQ的密码  
spring.rabbitmq.port RabbitMQ的端口号 5672
spring.rabbitmq.publisher-confirms 开启Publisher Confirm机制 false
spring.rabbitmq.publisher-returns 开启publisher Return机制 false
spring.rabbitmq.requested-heartbeat 请求心跳超时时间,单位为秒  
spring.rabbitmq.ssl.enabled 启用SSL支持 false
spring.rabbitmq.ssl.key-store 保存SSL证书的地址  
spring.rabbitmq.ssl.key-store-password 访问SSL证书的地址使用的密码  
spring.rabbitmq.ssl.trust-store SSL的可信地址  
spring.rabbitmq.ssl.trust-store-password 访问SSL的可信地址的密码  
spring.rabbitmq.ssl.algorithm SSL算法,默认使用Rabbit的客户端算法库  
spring.rabbitmq.template.mandatory 启用强制信息 false
spring.rabbitmq.template.receive-timeout receive()方法的超时时间 0
spring.rabbitmq.template.reply-timeout sendAndReceive()方法的超时时间 5000
spring.rabbitmq.template.retry.enabled 设置为true的时候RabbitTemplate能够实现重试 false
spring.rabbitmq.template.retry.initial-interval 第一次与第二次发布消息的时间间隔 1000
spring.rabbitmq.template.retry.max-attempts 尝试发布消息的最大数量 3
spring.rabbitmq.template.retry.max-interval 尝试发布消息的最大时间间隔 10000
spring.rabbitmq.template.retry.multiplier 上一次尝试时间间隔的乘数 1.0
spring.rabbitmq.username 登录到RabbitMQ的用户名  
spring.rabbitmq.virtual-host 连接到RabbitMQ的虚拟主机  
评论

全部评论(0)

回到顶部