返回 导航

SpringBoot / Cloud

hangge.com

消息驱动微服务框架Spring Cloud Stream使用详解4(消费组、消息分区)

作者:hangge | 2020-11-05 08:10

四、消费组和消息分区

1,未设置消费组和消息分区的情况

    通常来说生产环境中的每个服务都不会以单节点的方式运行,而是都会部署多个实例来实现高可以用和负载均衡。当同一个服务启动多个实例的时候,这些实例会绑定到同一个消息通道的目标主题上。
    默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理。

(1)首先我们在生产者服务中创建一个消息生产者应用,服务启动后每隔 1 秒会向 output 通道输出一跳消息:
    由于 Spring Cloud Stream 是基于 Spring Integration 构建起来的,所以在使用 Spring Cloud Stream 构建消息驱动服务的时候,完全可以使用 Spring Integration 的原生注解(比如 @ServiceActivator@InboundChannelAdapter)来实现各种业务需求。
@InboundChannelAdapter 注解定义了该方法是对 Source.OUTPUT 通道的输出绑定,同时使用 poller 属性将该方法设置为轮询执行,这里我们定义为 1000 毫秒。
@EnableBinding(value = Source.class)
public class SinkSender {

    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000"))
    public MessageSource<String> timeMessageSource() {
        SimpleDateFormat formatter = new SimpleDateFormat("hh:mm:ss");
        return () -> new GenericMessage<>("{\"name\":\"hangge\", \"time\":"
                + formatter.format(new Date()) + "}");
    }
}

(2)然后我们将生产者的输出通道绑定目标指向 greetings 主题:
spring.cloud.stream.bindings.output.destination=greetings

(3)接着我们在消费者服务中创建一个消费者应用:
@EnableBinding(Processor.class)
public class SinkReceiver {

    @StreamListener(Sink.INPUT)
    public void reveive(Object payload) {
        System.out.println("收到:" + payload);
    }
}

(4)让后我们同样将消费者的输入通道定目标指向 greetings 主题:
spring.cloud.stream.bindings.input.destination=greetings

(5)启动服务,其中消费者服务启动两个实例(使用不同端口,比如:70027003)。可以发现每次生产者产生的消息都会被每一个消费者实例接收和处理。
    默认情况下,当没有为应用指定消费组的时候,Spring Cloud Stream 会为其分配一个独立的匿名消费组。所以,如果同一主题下的所有应用都没有被指定消费组的时候,当有消息发布之后,所有的应用都会对其进行消费,因为它们各自都属于一个独立的组。

2,设置消费组

(1)如果我们希望生产者产生的消息只被其中一个实例消费,这个时候就需要为这些消费者设置消费组来实现。我们在消费者服务这边添加 spring.cloud.stream.bindings.xxxxx.group 属性即可:
注意:大部分情况下,我们在创建 Spring Cloud Stream 应用的时候,建议最好为其指定一个消费组,以防止对消息的重复处理,除非该行为需要这样做(比如刷新所有实例的配置等)
spring.cloud.stream.bindings.input.destination=greetings
spring.cloud.stream.bindings.input.group=Service-A

(2)再次重启消费者的两个服务实例(目前同属 Service-A 这个消费组),可以发现,每个生产者发出的消息会被启动的消费者以轮询的方式进行接收和处理。

3,指定消息分区

(1)消息分区的作用是当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。
    从上面样例可以看出,消费组无法控制消息具体被哪个实例消费。也就是说,对于同一条消息,它多次到达之后可能是由不同的实例进行消费的。但是对于一些业务场景,需要对一些具有相同特征的消息设置每次都被同一个消费实例处理:
  • 比如,一些用于监控服务,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身聚合这些数据,那么消息生产者可以为消息增加一个固有的特征 ID 来进行分区,使得拥有这些ID的消息每次都能被发送到一个特定的实例上实现累计统计的效果,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。

(2)为了实现消息分区,我们对消费者的配置文件做了一些修改,下面是第一个消费者实例的配置:
我们在之前的基础上增加了三个参数,作用如下:
  • spring.cloud.stream.bindings.input.consumer.partitioned:通过该参数开启消费者分区功能
  • spring.cloud.stream.instance-count:该参数指定了当前消费者的总实例数量。
  • spring.cloud.stream.instance-index:该参数设置当前实例的索引号(从 0 开始)
spring.cloud.stream.bindings.input.destination=greetings
spring.cloud.stream.bindings.input.group=Service-A

spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instance-count=2
spring.cloud.stream.instance-index=0

(3)另一个消费者实例的配置如下,可以看到只是索引号不一样:
spring.cloud.stream.bindings.input.destination=greetings
spring.cloud.stream.bindings.input.group=Service-A

spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instance-count=2
spring.cloud.stream.instance-index=1

(4)而生产者这边配置也做了一些修改:
我们在之前的基础上增加了两个参数,作用如下:
  • spring.cloud.stream.bindings.output.producer.partition-count:该参数指定了消息分区的数量
  • spring.cloud.stream.bindings.output.producer.partition-key-expression:该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则配置 SpEL 来生成合适的分区键。
spring.cloud.stream.bindings.output.destination=greetings
spring.cloud.stream.bindings.output.producer.partition-count=2
spring.cloud.stream.bindings.output.producer.partition-key-expression=0

(5)这里分区键表达式我直接设置成固定的 0,这样生产者发出的消息只会被分区为 0 的实例接收并处理:

(6)前面提到 spring.cloud.stream.bindings.output.producer.partition-key-expression 参数可以通过设置 SpEL 表达式来根据实际消息来动态选择输出分区。下面通过样例进行演示(只有生产者这边需要修改,消费者方面不需要变化):
  • 首先我们定义一个 Bean 作为消息发送对象,注意对象中的 partition 属性用于指定该消息需要发送的分区索引:
@Setter
@Getter
@AllArgsConstructor
public class MyMessage {
    private String name;
    private Date date;
    private Integer partition;
}

@EnableBinding(value = Source.class)
public class SinkSender {

    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000"))
    public MessageSource<MyMessage> timeMessageSource() {
        return () -> new GenericMessage<>(new MyMessage("hangge", new Date(), 1));
    }
}

spring.cloud.stream.bindings.output.destination=greetings
spring.cloud.stream.bindings.output.producer.partition-count=2
spring.cloud.stream.bindings.output.producer.partition-key-expression=payload.partition


评论

全部评论(0)

回到顶部