消息驱动微服务框架Spring Cloud Stream使用详解5(消息类型转换)
作者:hangge | 2020-11-09 08:10
我们知道 Spring Cloud Stream 是基于 Spring Integration 构建起来的,所以 Spring Cloud Stream 完全可以使用 Spring Integration 的原生注解来实现各种业务需求。
Spring Integration 的原生的 @ServiceActivator 是用来实现对输入消息通道的监听,Spring Cloud Stream 新增的 @StreamListener 也是实现同样的功能,不过更加强大。因为它还内置了一系列的消息转换功能,这使得基于 @StreamListener 注解实现的消息处理模型更为简单。
五、消息类型转换
1,基本介绍
(1)Spring Cloud Stream 允许我们使用绑定的 spring.cloud.stream.bindings.<channelName>.content-type 属性以声明式的配置方式为输入和输出通道设置消息内容的类型。
(2)除了可以自定义消息转换器之外,原生的消息转换器已经可以基本满足我们的业务需求。目前,Spring Cloud Stream 中自带支持了以下几种常用的消息类型转换:
- JSON 与 POJO 的互相转换。
- JSON 与 org.springframework.tuple.Tuple 的互相转换。
- Object 与 byte[] 的互相转换。为了实现远程传输序列化的原始字节,应用程序需要发送 byte 类型的数据,或是通过实现 Java 的序列化接由来转换为字节(object 对象必须可序列化)。
- String 与 byte[] 的互相转换。
- Object 向纯文本的转换:Object 需要实现 toString() 方法。
注意:上面所指的 JSON 类型可以表现为一个 byte 类型的数组,也可以是一个包含有效 JSON 内容的字符串。另外,Object 对象可以由 JS0N、byte 数组或者字符串转换而来,但是在转换为 JSON 的时候总是以字符串的形式返回。
2,发送对象 -> 接收对象
(1)比如消息生产者这边我们可以直接发送一个对象:
@RestController public class HelloController { @Autowired private Source source; @GetMapping("/test") public void hello() { // 将需要发送的消息封装为Message对象 Message message = MessageBuilder .withPayload(new User("hangge", 100)) .build(); // 发送Message对象 source.output().send(message); } }
- 其中 User 对象模型定义如下:
@Getter @Setter @AllArgsConstructor @ToString public class User { private String name; private Integer age; }
(2)而消费者这边可以直接接收这个对象:
@EnableBinding(Sink.class) public class SinkReceiver { @StreamListener(Sink.INPUT) public void reveive(User user) { System.out.println("收到:" + user); } }
3,发送 JSON 字符串 -> 接收对象
(1)如果消息生产者这边发送一个 JSON 字符串:
@RestController public class HelloController { @Autowired private Source source; @GetMapping("/test") public void hello() { // 将需要发送的消息封装为Message对象 Message message = MessageBuilder .withPayload("{\"name\":\"hangge\",\"age\":100}") .build(); // 发送Message对象 source.output().send(message); } }
(2)由于 Spring Cloud Stream 内置的消息转换器会自动将 JSON 转换成具体对象,所以消费者这边同样直接可以使用对象进行接收数据:
@EnableBinding(Sink.class) public class SinkReceiver { @StreamListener(Sink.INPUT) public void reveive(User user) { System.out.println("收到:" + user); } }
全部评论(0)