消息驱动微服务框架Spring Cloud Stream使用详解3(自定义通道接口、消息反馈)
作者:hangge | 2020-11-04 08:10
在前文中我们使用 Spring Cloud Stream 自带的 Sink 和 Source 通道接口来进行消息的接收和发送。但如果业务复杂,即一个应用中需有多个输入通道,或者多个输出通道(对应不同主题),那么就需要我们自定义接口来实现。
三、自定义消息通道接口
1,接口定义
首先我们自定义一个消息通道接口 MyProcessor,里面包含两个输出通道,以及两个输入通道:
public interface MyProcessor { String MESSAGE_OUTPUT = "message_output"; String MESSAGE_INPUT = "message_input"; String LOG_OUTPUT = "log_output"; String LOG_INPUT = "log_input"; @Output(MESSAGE_OUTPUT) MessageChannel messageOutput(); @Input(MESSAGE_INPUT) SubscribableChannel messageInput(); @Output(LOG_OUTPUT) MessageChannel logOutput(); @Input(LOG_INPUT) SubscribableChannel logInput(); }
2,设置主题
编辑项目的 applicaiton.properties 文件,添加如下配置,让 message_output 与 message_input,log_output 与 log_input 通道的主题两两相同(即指向同一个 Exchange 交换器)
spring.cloud.stream.bindings.message_output.destination=hangge.message spring.cloud.stream.bindings.message_input.destination=hangge.message spring.cloud.stream.bindings.log_output.destination=hangge.log spring.cloud.stream.bindings.log_input.destination=hangge.log
3,创建消息消费者
在该服务中我们分别监听并处理 message_input 和 log_input 这两个通道的消息。同时我们还通过 @SendTo 把 processMessage 这个处理方法返回的内容以消息的方式发送到 log_output 通道中。
@EnableBinding(MyProcessor.class) public class MyProcessorReceiver { /** * 通过 MyProcessor.MESSAGE_INPUT 接收消息 * 然后通过 @SendTo 将处理后的消息发送到 MyProcessor.LOG_OUTPUT */ @StreamListener(MyProcessor.MESSAGE_INPUT) @SendTo(MyProcessor.LOG_OUTPUT) public String processMessage(String message) { System.out.println("接收到消息:" + message); DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy年MM月dd日hh:mm:ss"); String now = LocalDateTime.now().format(formatter2); return now + "接收到1条消息"; } /** * 接收来自 MyProcessor.LOG_INPUT 的消息 * 也就是通过上面的 @SendTo 发送来的日志消息, * 因为 MyProcessor.LOG_OUTPUT 和 MyProcessor.LOG_INPUT 是指向同一 exchange */ @StreamListener(MyProcessor.LOG_INPUT) public void processLog(String message) { System.out.println("接收到日志:" + message); } }
4,创建消息生产者
最后创建一个 Controller 用来发送消息:
@RestController public class HelloController { @Autowired private MyProcessor myProcessor; @GetMapping("/test") public void test(){ // 将需要发送的消息封装为Message对象 Message message = MessageBuilder .withPayload("welcome to hangge.com") .build(); // 发送Message对象 myProcessor.messageOutput().send(message); } }
5,运行测试
(1)启动应用,从 RabbitMQ 控制台中可以看到增加了两个分别名叫 hangge.log 和 hangge.message 的 exchange 交换器:
(2)访问 /test 接口发送消息,可以看到控制台输出如下消息,说明消息的生产和消费都成功了。整个流程就是:
- 原始消息发送到名为 hangge.messages 的 exchange
- 消费者从名为 hangge.messages 的 exchange 接收原始消息,然后生成日志消息发送到 hangge.log 的 exchange
- 消费者从名为 hangge.log 的 exchange 接收日志消息
全部评论(0)