消息驱动微服务框架Spring Cloud Stream使用详解3(自定义通道接口、消息反馈)
作者:hangge | 2020-11-04 08:10
在前文中我们使用 Spring Cloud Stream 自带的 Sink 和 Source 通道接口来进行消息的接收和发送。但如果业务复杂,即一个应用中需有多个输入通道,或者多个输出通道(对应不同主题),那么就需要我们自定义接口来实现。

三、自定义消息通道接口
1,接口定义
首先我们自定义一个消息通道接口 MyProcessor,里面包含两个输出通道,以及两个输入通道:
public interface MyProcessor {
String MESSAGE_OUTPUT = "message_output";
String MESSAGE_INPUT = "message_input";
String LOG_OUTPUT = "log_output";
String LOG_INPUT = "log_input";
@Output(MESSAGE_OUTPUT)
MessageChannel messageOutput();
@Input(MESSAGE_INPUT)
SubscribableChannel messageInput();
@Output(LOG_OUTPUT)
MessageChannel logOutput();
@Input(LOG_INPUT)
SubscribableChannel logInput();
}
2,设置主题
编辑项目的 applicaiton.properties 文件,添加如下配置,让 message_output 与 message_input,log_output 与 log_input 通道的主题两两相同(即指向同一个 Exchange 交换器)
spring.cloud.stream.bindings.message_output.destination=hangge.message spring.cloud.stream.bindings.message_input.destination=hangge.message spring.cloud.stream.bindings.log_output.destination=hangge.log spring.cloud.stream.bindings.log_input.destination=hangge.log
3,创建消息消费者
在该服务中我们分别监听并处理 message_input 和 log_input 这两个通道的消息。同时我们还通过 @SendTo 把 processMessage 这个处理方法返回的内容以消息的方式发送到 log_output 通道中。
@EnableBinding(MyProcessor.class)
public class MyProcessorReceiver {
/**
* 通过 MyProcessor.MESSAGE_INPUT 接收消息
* 然后通过 @SendTo 将处理后的消息发送到 MyProcessor.LOG_OUTPUT
*/
@StreamListener(MyProcessor.MESSAGE_INPUT)
@SendTo(MyProcessor.LOG_OUTPUT)
public String processMessage(String message) {
System.out.println("接收到消息:" + message);
DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy年MM月dd日hh:mm:ss");
String now = LocalDateTime.now().format(formatter2);
return now + "接收到1条消息";
}
/**
* 接收来自 MyProcessor.LOG_INPUT 的消息
* 也就是通过上面的 @SendTo 发送来的日志消息,
* 因为 MyProcessor.LOG_OUTPUT 和 MyProcessor.LOG_INPUT 是指向同一 exchange
*/
@StreamListener(MyProcessor.LOG_INPUT)
public void processLog(String message) {
System.out.println("接收到日志:" + message);
}
}
4,创建消息生产者
最后创建一个 Controller 用来发送消息:
@RestController
public class HelloController {
@Autowired
private MyProcessor myProcessor;
@GetMapping("/test")
public void test(){
// 将需要发送的消息封装为Message对象
Message message = MessageBuilder
.withPayload("welcome to hangge.com")
.build();
// 发送Message对象
myProcessor.messageOutput().send(message);
}
}
5,运行测试
(1)启动应用,从 RabbitMQ 控制台中可以看到增加了两个分别名叫 hangge.log 和 hangge.message 的 exchange 交换器:

(2)访问 /test 接口发送消息,可以看到控制台输出如下消息,说明消息的生产和消费都成功了。整个流程就是:
- 原始消息发送到名为 hangge.messages 的 exchange
- 消费者从名为 hangge.messages 的 exchange 接收原始消息,然后生成日志消息发送到 hangge.log 的 exchange
- 消费者从名为 hangge.log 的 exchange 接收日志消息
全部评论(0)