返回 导航

SpringBoot / Cloud

hangge.com

消息驱动微服务框架Spring Cloud Stream使用详解2(基本用法)

作者:hangge | 2020-11-03 08:10
    本文通过一个简单的样例演示如何通过 Spring Cloud Stream 实现消息的发送和接收。Spring Cloud Stream 默认提供了三个通道接口:SinkSource Processor,如果当前应用最多只需一个输入通道、一个输出通道,那么直接使用默认通道接口即可,无需再自定义通道接口。

二、基本用法

1,添加依赖

(1)创建一个基础的 Spring Boot 工程,编辑 pom.xml 文件,引入 Spring Cloud StreamRabbitMQ 的支持:
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
</dependencies>
 
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

(2)在 application.properties 中配置 RebbitMQ 的连接信息:
spring.rabbitmq.host=192.168.60.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=hangge
spring.rabbitmq.password=123

2,创建消息的消费者

(1)我们创建一个用于接收来自 RabbitMQ 消息的消费者 SinkReceiver
(1)@EnableBinding 注解用来指定一个或多个定义了 @Input @Output 注解的接口,以此实现对消息通道(Channel)的绑定:
  • @EnableBinding(Sink.class) 表示绑定了 Sink 接口。
(2)@StreamListener 主要定义在方法上,作用是将被修饰的方法注册为消息中间件数据流的时间监听器,注解中的属性值对应了监听的消息通道名。
  • @StreamListener(Sink.INPUT) 表示将 receive 方法注册为 input 消息通道的监听处理器,当接收到消息时,该方法机会作出对应的响应动作。
@EnableBinding(Sink.class)
public class SinkReceiver {

    @StreamListener(Sink.INPUT)
    public void reveive(Object payload) {
        System.out.println("收到:" + payload);
    }
}

(2)Spring Cloud Stream 默认提供了三个通道接口:SinkSource Processor,它们源码如下:
(1)三个通道接口说明:
  • Sink Source 中分别通过 @Input @Output 注解定义了输入通道和输出通道。
  • Processor 通过继承 Source Sink 的方式同时定义了一个输入通道和一个输出通道。
(2)@Input @Output 注解都还有一个 Value 属性,该属性可以用来设置消息通道的名称。没有指定具体的值,将默认使用方法名作为消息通道的名称。
  • 因为指定具体的值,Sink Source 的消息通道名称分别为 input output
public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}


public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}


public interface Processor extends Source, Sink {
}

(3)启动程序,我们可以在 RabbitMQ 控制台的 Exchanges 中看到增加了一个名为 input 的交换器:

(4)同时在 Queues 中有一个前缀为 input.anonymous. 的对列:

(5)我们可以进入 input 交换器的管理页面,通过 Publish message 功能来发送一条(由于该交换器与 input.anonymous.xxxx 队列已经绑定,所以消息会转发到对应队列上):

(6)而在应用程序的控制台这边可以看到如下内容,说明消费者已经成功接收到消息队列中消息对象。

3,创建消息的生产者

(1)我们创建一个 Controller 接口,在里面注入默认的 Source 输出通道接口实例,并调用它的发送消息方法。
@RestController
public class HelloController {

    @Autowired
    private Source source;

    @GetMapping("/test")
    public void test(){
        // 将需要发送的消息封装为Message对象
        Message message = MessageBuilder
                .withPayload("welcome to hangge.com")
                .build();
        // 发送Message对象
        source.output().send(message);
    }
}

(2)我们还可以注入消息通道对象来使用,比如下面我们注入 Source 接口中定义的名为 output 的消息输出通道,代码效果同上面是一样的:
注意:很多时候我们在一个微服务应用中可能会创建多个不同名的 MessageChannel 实例,这样通过 @Autowired 注入时,要注意参数命名需要与通道同名才能被正确注入,或者也可以使用 @Qualifier 注解来特别指定具体实例的名称,该名称需要与定义 MessageChannel @Output 中的 value 参数一致,这样才能被正确注入。
@RestController
public class HelloController {

    @Autowired
    private MessageChannel output;

    @GetMapping("/test")
    public void test(){
        // 将需要发送的消息封装为Message对象
        Message message = MessageBuilder
                .withPayload("welcome to hangge.com")
                .build();
        // 发送Message对象
        output.send(message);
    }
}

(3)然后对之前的消费者 SinkReceiver 做一些修改,在 @EnableBinding 注解中增加对 SinkSender 接口的指定,使得 Spring Cloud Stream 能创建出对应的实例。
我们知道 Processor 接口是继承 Sink Processor 接口的,因此 @EnableBinding 注解直接指定 Processor 接口也是可以的:
  • @EnableBinding(Processor.class)
@EnableBinding({Sink.class, Source.class})
public class SinkReceiver {

    @StreamListener(Sink.INPUT)
    public void reveive(Object payload) {
        System.out.println("收到:" + payload);
    }
}

(4)启动应用调用 /test 接口发送消息,会发现消费者并没有接收到消息。这是因为 SinkSource 的消息通道名称分别为 input output,如果没有指定主题的话,那么主题即为通道名称。在 RabbitMQ 这边则对应的是 Exchange 名称:

(5)我们在 application.properties 文件中将这两个通道都设置成同一个主题,比如:publish
spring.cloud.stream.bindings.input.destination=publish
spring.cloud.stream.bindings.output.destination=publish

(6)再次启动应用,发现 RabbitMQ 这边多了个名为 publish Exchange 交换器:

(7)再次调用应用的 /test 接口发送消息,会发现消费者已经能成功接收到消息了。

附:改用 Kafka 作为消息中间件

    上面的样例我们都是使用 RabbitMQ 这个消息中间件,如果想替换成 Kafka 十分简单。由于 Spring Cloud Stream 的特性,我们只需修改一些配置,代码完全不需要修改。
(1)首先编辑 pom.xml 文件,增加引入 Spring Cloud StreamKafka 的支持:
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>
 
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

(2)然后在 application.properties 中配置 Kafka 的连接信息即可:
spring.cloud.stream.kafka.binder.brokers=192.168.60.133:9092

(3)启动后我们通过命令查看 Kafka 中的 Topic,可以发现里面多个了名为 publish Topic

(4)调用 /test 接口发送消息,会发现消费者也能成功接收到消息了。
评论

全部评论(0)

回到顶部