SpringBoot - 状态机框架StateMachine使用详解3(状态机持久化)
作者:hangge | 2023-01-21 09:56
通常来说,我们项目的状态机不可能都是从头一路走到尾的,而是可能需要在某个环节停留,然后等待其他业务的触发,再继续下面的流程。比如用户 A 创建了个订单 A,但他可能要第二天才付款。而在他付款前,可能会有其他用户也需要下单。那么我们就需要在订单 A 创建后将状态机状态保存起来,等用户 A 付款前再将其恢复。这个便是持久化。
(2)接着测试一下,我们每次发送事件前会先恢复状态机状态,等发送后再保存状态。
(3)上面代码我们首先发送订单 1 的支付事件,而订单 1 的收货事件等待 6 秒钟后才发送。在这之前我们又分别发送订单 2 的支付事件、收货事件。运行结果如下,可以看到由于做了持久化,状态机不会发生冲突。
(2)然后编辑项目的 application.properties 文件,添加 Redis 连接配置:
(3)然后状态机持久化的配置类内容如下:
(4)最后测试一下,这部分代码和上面内存持久化是一样的:
(5)查看 Redis 也可以看到里面确实保存了状态机状态数据:
七、状态机持久化
1,状态持久化意义
通过持久化状态,Spring StateMachine 可以保证流程可恢复性,高可用性,和可维护性,使得状态机可以长时间运行,在分布式系统中运行。具体原因主要有以下几点:- 故障恢复: 如果状态机的运行过程中发生故障,持久化状态可以帮助状态机恢复到故障之前的状态,从而继续执行业务流程。
- 可恢复性: 如果状态机的运行过程中发生意外中断,例如应用程序重新启动或宕机,持久化状态可以帮助状态机恢复到中断之前的状态,继续执行业务流程。
- 长时间运行: 如果状态机需要长时间运行,例如几天或几周,持久化状态可以帮助状态机恢复到中断之前的状态,并继续执行业务流程。
- 分布式系统: 如果状态机是分布式系统中的一部分,持久化状态可以帮助状态机在不同节点之间进行状态共享,从而维护整体业务流程的一致性。
2,状态持久化方式
- 内存持久化:状态机的状态存储在内存中,在应用程序重新启动后不会被持久化。如果没有配置其他持久化存储,则默认行为为此行为。
- 基于 JDBC 的持久化:我们可以使用 MySQL,PostgreSQL 或 H2 等关系数据库来持久化状态机的状态。为此,我们需要通过扩展 AbstractPersistStateMachineHandler 创建自定义持久化插件,然后配置状态机使用自定义插件。
- 基于 MongoDB 的持久化:我们可以使用 MongoDB 来持久化状态机的状态,使用 MongoDbStateMachinePersister。为此,我们需要配置 MongoTemplate bean 然后将其添加为拦截器到状态机上,可以参考上面的示例
- 基于 Redis 的持久化:我们可以使用 Redis 作为状态机的存储,通过配置 RedisStateMachinePersister。 我们需要配置 RedisConnectionFactory bean,然后创建 RedisStateMachineContextRepository 来保存状态。
3,内存持久化样例
(1)我们对前文的样例做个改造,增加一个持久化配置类,用于实现内存持久化。其原理使用唯一 id 作为 key(本样例是订单 id),把状态机保存到 map 表里面,等需要恢复时再从 map 中取出。
// 状态机持久化的配置类
@Configuration
public class StateMachinePersisterConfig {
/**
* 内存持久化配置
*/
@Bean
public DefaultStateMachinePersister persister() {
return new DefaultStateMachinePersister(new StateMachinePersist() {
//用户保存所有状态机上下文
private Map map = new HashMap();
// 持久化状态机
@Override
public void write(StateMachineContext context, Object contextObj) throws Exception {
System.out.println("持久化状态机:contextObj:" + contextObj.toString());
map.put(contextObj, context);
}
// 获取状态机
@Override
public StateMachineContext read(Object contextObj) throws Exception {
System.out.println("获取状态机,contextObj:" + contextObj.toString());
StateMachineContext stateMachineContext = (StateMachineContext) map.get(contextObj);
return stateMachineContext;
}
});
}
}
(2)接着测试一下,我们每次发送事件前会先恢复状态机状态,等发送后再保存状态。
@SpringBootApplication
public class TestApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
// 状态机对象
@Autowired
private StateMachine<States, Events> stateMachine;
@Autowired
private StateMachinePersister<States, Events, String> stateMachineMemPersister;
//在run函数中,我们定义了整个流程的处理过程
@Override
public void run(String... args) throws Exception {
// 创建第1个订单对象
Order order1 = new Order();
order1.setStates(States.UNPAID);
order1.setId(1);
System.out.println("--- 发送第1个订单支付事件 ---");
boolean result = this.sendEvent(Events.PAY, order1);
System.out.println("> 事件是否发送成功:" + result
+ ",订单编号:" + order1.getId()
+",当前状态:" + stateMachine.getState().getId());
// 在子线程中等待6秒后再发送收货事件
new Thread(() -> {
try {
Thread.sleep(6000);
System.out.println("--- 发送第1个订单收货事件 ---");
boolean result2 = this.sendEvent(Events.RECEIVE, order1);
System.out.println("> 事件是否发送成功:" + result2
+ ",订单编号:" + order1.getId()
+",当前状态:" + stateMachine.getState().getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 创建第2个订单对象
Order order2 = new Order();
order2.setStates(States.UNPAID);
order2.setId(2);
System.out.println("--- 发送第2个订单支付事件 ---");
result = this.sendEvent(Events.PAY, order2);
System.out.println("> 事件是否发送成功:" + result
+ ",订单编号:" + order2.getId()
+",当前状态:" + stateMachine.getState().getId());
System.out.println("--- 发送第2个订单收货事件 ---");
result = this.sendEvent(Events.RECEIVE, order2);
System.out.println("> 事件是否发送成功:" + result
+ ",订单编号:" + order2.getId()
+",当前状态:" + stateMachine.getState().getId());
}
/**
* 发送订单状态转换事件
* synchronized修饰保证这个方法是线程安全的
*/
private synchronized boolean sendEvent(Events changeEvent, Order order) {
boolean result = false;
try {
//启动状态机
stateMachine.start();
//尝试恢复状态机状态
stateMachineMemPersister.restore(stateMachine, String.valueOf(order.getId()));
Message message = MessageBuilder.withPayload(changeEvent)
.setHeader("order", order).build();
result = stateMachine.sendEvent(message);
//持久化状态机状态
stateMachineMemPersister.persist(stateMachine, String.valueOf(order.getId()));
} catch (Exception e) {
System.out.println("操作失败:" + e.getMessage());
} finally {
stateMachine.stop();
}
return result;
}
}
(3)上面代码我们首先发送订单 1 的支付事件,而订单 1 的收货事件等待 6 秒钟后才发送。在这之前我们又分别发送订单 2 的支付事件、收货事件。运行结果如下,可以看到由于做了持久化,状态机不会发生冲突。

4,基于 Redis 的持久化
(1)要使用 Redis 进行持久化,首先编辑项目的 pom.xml 文件,添加相关依赖:
<!-- redis持久化状态机 -->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-redis</artifactId>
<version>1.2.14.RELEASE</version>
</dependency>
(2)然后编辑项目的 application.properties 文件,添加 Redis 连接配置:
# Redis数据库索引(默认为0) spring.redis.database=0 # Redis服务器地址 spring.redis.host=192.168.60.9 # Redis服务器连接端口 spring.redis.port=6379 # Redis服务器连接密码(默认为空) spring.redis.password=123456 # 连接池最大连接数(使用负值表示没有限制) spring.redis.pool.max-active=8 # 连接池最大阻塞等待时间(使用负值表示没有限制) spring.redis.pool.max-wait=-1 # 连接池中的最大空闲连接 spring.redis.pool.max-idle=8 # 连接池中的最小空闲连接 spring.redis.pool.min-idle=0 # 连接超时时间(毫秒) spring.redis.timeout=0
(3)然后状态机持久化的配置类内容如下:
// 状态机持久化的配置类
@Configuration
public class StateMachinePersisterConfig {
@Resource
private RedisConnectionFactory redisConnectionFactory;
/**
* Redis持久化配置
*/
@Bean
public RedisStateMachinePersister persister() {
RedisStateMachineContextRepository<Events, States> repository
= new RedisStateMachineContextRepository(redisConnectionFactory);
RepositoryStateMachinePersist p = new RepositoryStateMachinePersist<>(repository);
return new RedisStateMachinePersister<>(p);
}
}
(4)最后测试一下,这部分代码和上面内存持久化是一样的:
@SpringBootApplication
public class TestApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
// 状态机对象
@Autowired
private StateMachine<States, Events> stateMachine;
@Autowired
private StateMachinePersister<States, Events, String> stateMachineMemPersister;
//在run函数中,我们定义了整个流程的处理过程
@Override
public void run(String... args) throws Exception {
// 创建第1个订单对象
Order order1 = new Order();
order1.setStates(States.UNPAID);
order1.setId(1);
System.out.println("--- 发送第1个订单支付事件 ---");
boolean result = this.sendEvent(Events.PAY, order1);
System.out.println("> 事件是否发送成功:" + result
+ ",订单编号:" + order1.getId()
+",当前状态:" + stateMachine.getState().getId());
// 在子线程中等待6秒后再发送收货事件
new Thread(() -> {
try {
Thread.sleep(6000);
System.out.println("--- 发送第1个订单收货事件 ---");
boolean result2 = this.sendEvent(Events.RECEIVE, order1);
System.out.println("> 事件是否发送成功:" + result2
+ ",订单编号:" + order1.getId()
+",当前状态:" + stateMachine.getState().getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 创建第2个订单对象
Order order2 = new Order();
order2.setStates(States.UNPAID);
order2.setId(2);
System.out.println("--- 发送第2个订单支付事件 ---");
result = this.sendEvent(Events.PAY, order2);
System.out.println("> 事件是否发送成功:" + result
+ ",订单编号:" + order2.getId()
+",当前状态:" + stateMachine.getState().getId());
System.out.println("--- 发送第2个订单收货事件 ---");
result = this.sendEvent(Events.RECEIVE, order2);
System.out.println("> 事件是否发送成功:" + result
+ ",订单编号:" + order2.getId()
+",当前状态:" + stateMachine.getState().getId());
}
/**
* 发送订单状态转换事件
* synchronized修饰保证这个方法是线程安全的
*/
private synchronized boolean sendEvent(Events changeEvent, Order order) {
boolean result = false;
try {
//启动状态机
stateMachine.start();
//尝试恢复状态机状态
stateMachineMemPersister.restore(stateMachine, String.valueOf(order.getId()));
Message message = MessageBuilder.withPayload(changeEvent)
.setHeader("order", order).build();
result = stateMachine.sendEvent(message);
//持久化状态机状态
stateMachineMemPersister.persist(stateMachine, String.valueOf(order.getId()));
} catch (Exception e) {
System.out.println("操作失败:" + e.getMessage());
} finally {
stateMachine.stop();
}
return result;
}
}

(5)查看 Redis 也可以看到里面确实保存了状态机状态数据:
全部评论(0)