返回 导航

SpringBoot / Cloud

hangge.com

SpringBoot - 状态机框架StateMachine使用详解3(状态机持久化)

作者:hangge | 2023-01-21 09:56
    通常来说,我们项目的状态机不可能都是从头一路走到尾的,而是可能需要在某个环节停留,然后等待其他业务的触发,再继续下面的流程。比如用户 A 创建了个订单 A,但他可能要第二天才付款。而在他付款前,可能会有其他用户也需要下单。那么我们就需要在订单 A 创建后将状态机状态保存起来,等用户 A 付款前再将其恢复。这个便是持久化。

七、状态机持久化

1,状态持久化意义

    通过持久化状态,Spring StateMachine 可以保证流程可恢复性,高可用性,和可维护性,使得状态机可以长时间运行,在分布式系统中运行。具体原因主要有以下几点:
  • 故障恢复: 如果状态机的运行过程中发生故障,持久化状态可以帮助状态机恢复到故障之前的状态,从而继续执行业务流程。
  • 可恢复性: 如果状态机的运行过程中发生意外中断,例如应用程序重新启动或宕机,持久化状态可以帮助状态机恢复到中断之前的状态,继续执行业务流程。
  • 长时间运行: 如果状态机需要长时间运行,例如几天或几周,持久化状态可以帮助状态机恢复到中断之前的状态,并继续执行业务流程。
  • 分布式系统: 如果状态机是分布式系统中的一部分,持久化状态可以帮助状态机在不同节点之间进行状态共享,从而维护整体业务流程的一致性。

2,状态持久化方式

  • 内存持久化:状态机的状态存储在内存中,在应用程序重新启动后不会被持久化。如果没有配置其他持久化存储,则默认行为为此行为。
  • 基于 JDBC 的持久化:我们可以使用 MySQLPostgreSQLH2 等关系数据库来持久化状态机的状态。为此,我们需要通过扩展 AbstractPersistStateMachineHandler 创建自定义持久化插件,然后配置状态机使用自定义插件。
  • 基于 MongoDB 的持久化:我们可以使用 MongoDB 来持久化状态机的状态,使用 MongoDbStateMachinePersister。为此,我们需要配置 MongoTemplate bean 然后将其添加为拦截器到状态机上,可以参考上面的示例
  • 基于 Redis 的持久化:我们可以使用 Redis 作为状态机的存储,通过配置 RedisStateMachinePersister。 我们需要配置 RedisConnectionFactory bean,然后创建 RedisStateMachineContextRepository 来保存状态。

3,内存持久化样例

(1)我们对前文的样例做个改造,增加一个持久化配置类,用于实现内存持久化。其原理使用唯一 id 作为 key(本样例是订单 id),把状态机保存到 map 表里面,等需要恢复时再从 map 中取出。
// 状态机持久化的配置类
@Configuration
public class StateMachinePersisterConfig {
  /**
   * 内存持久化配置
   */
  @Bean
  public DefaultStateMachinePersister persister() {
    return new DefaultStateMachinePersister(new StateMachinePersist() {
      //用户保存所有状态机上下文
      private Map map = new HashMap();

      // 持久化状态机
      @Override
      public void write(StateMachineContext context, Object contextObj) throws Exception {
        System.out.println("持久化状态机:contextObj:" + contextObj.toString());
        map.put(contextObj, context);
      }

      // 获取状态机
      @Override
      public StateMachineContext read(Object contextObj) throws Exception {
        System.out.println("获取状态机,contextObj:" + contextObj.toString());
        StateMachineContext stateMachineContext = (StateMachineContext) map.get(contextObj);
        return stateMachineContext;
      }
    });
  }
}

(2)接着测试一下,我们每次发送事件前会先恢复状态机状态,等发送后再保存状态。
@SpringBootApplication
public class TestApplication implements CommandLineRunner {

  public static void main(String[] args) {
    SpringApplication.run(TestApplication.class, args);
  }

  // 状态机对象
  @Autowired
  private StateMachine<States, Events> stateMachine;

  @Autowired
  private StateMachinePersister<States, Events, String> stateMachineMemPersister;

  //在run函数中,我们定义了整个流程的处理过程
  @Override
  public void run(String... args) throws Exception {
    // 创建第1个订单对象
    Order order1 = new Order();
    order1.setStates(States.UNPAID);
    order1.setId(1);

    System.out.println("--- 发送第1个订单支付事件 ---");
    boolean result = this.sendEvent(Events.PAY, order1);
    System.out.println("> 事件是否发送成功:" + result
            + ",订单编号:" + order1.getId()
            +",当前状态:" + stateMachine.getState().getId());

    // 在子线程中等待6秒后再发送收货事件
    new Thread(() -> {
      try {
        Thread.sleep(6000);
        System.out.println("--- 发送第1个订单收货事件 ---");
        boolean result2 = this.sendEvent(Events.RECEIVE, order1);
        System.out.println("> 事件是否发送成功:" + result2
                + ",订单编号:" + order1.getId()
                +",当前状态:" + stateMachine.getState().getId());
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }).start();

    // 创建第2个订单对象
    Order order2 = new Order();
    order2.setStates(States.UNPAID);
    order2.setId(2);

    System.out.println("--- 发送第2个订单支付事件 ---");
    result = this.sendEvent(Events.PAY, order2);
    System.out.println("> 事件是否发送成功:" + result
            + ",订单编号:" + order2.getId()
            +",当前状态:" + stateMachine.getState().getId());

    System.out.println("--- 发送第2个订单收货事件 ---");
    result = this.sendEvent(Events.RECEIVE, order2);
    System.out.println("> 事件是否发送成功:" + result
            + ",订单编号:" + order2.getId()
            +",当前状态:" + stateMachine.getState().getId());

  }

  /**
   * 发送订单状态转换事件
   * synchronized修饰保证这个方法是线程安全的
   */
  private synchronized boolean sendEvent(Events changeEvent, Order order) {
    boolean result = false;
    try {
      //启动状态机
      stateMachine.start();
      //尝试恢复状态机状态
      stateMachineMemPersister.restore(stateMachine, String.valueOf(order.getId()));
      Message message = MessageBuilder.withPayload(changeEvent)
              .setHeader("order", order).build();
      result = stateMachine.sendEvent(message);
      //持久化状态机状态
      stateMachineMemPersister.persist(stateMachine, String.valueOf(order.getId()));
    } catch (Exception e) {
      System.out.println("操作失败:" + e.getMessage());
    } finally {
      stateMachine.stop();
    }
    return result;
  }
}

(3)上面代码我们首先发送订单 1 的支付事件,而订单 1 的收货事件等待 6 秒钟后才发送。在这之前我们又分别发送订单 2 的支付事件、收货事件。运行结果如下,可以看到由于做了持久化,状态机不会发生冲突。 

4,基于 Redis 的持久化

(1)要使用 Redis 进行持久化,首先编辑项目的 pom.xml 文件,添加相关依赖: 
<!-- redis持久化状态机 -->
<dependency>
    <groupId>org.springframework.statemachine</groupId>
    <artifactId>spring-statemachine-redis</artifactId>
    <version>1.2.14.RELEASE</version>
</dependency>

(2)然后编辑项目的 application.properties 文件,添加 Redis 连接配置:
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=192.168.60.9
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=123456
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.pool.max-idle=8
# 连接池中的最小空闲连接
spring.redis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=0

(3)然后状态机持久化的配置类内容如下:
// 状态机持久化的配置类
@Configuration
public class StateMachinePersisterConfig {

  @Resource
  private RedisConnectionFactory redisConnectionFactory;

  /**
   * Redis持久化配置
   */
  @Bean
  public RedisStateMachinePersister persister() {
    RedisStateMachineContextRepository<Events, States> repository
            = new RedisStateMachineContextRepository(redisConnectionFactory);
    RepositoryStateMachinePersist p = new RepositoryStateMachinePersist<>(repository);
    return new RedisStateMachinePersister<>(p);
  }
}

(4)最后测试一下,这部分代码和上面内存持久化是一样的:
@SpringBootApplication
public class TestApplication implements CommandLineRunner {

  public static void main(String[] args) {
    SpringApplication.run(TestApplication.class, args);
  }

  // 状态机对象
  @Autowired
  private StateMachine<States, Events> stateMachine;

  @Autowired
  private StateMachinePersister<States, Events, String> stateMachineMemPersister;

  //在run函数中,我们定义了整个流程的处理过程
  @Override
  public void run(String... args) throws Exception {
    // 创建第1个订单对象
    Order order1 = new Order();
    order1.setStates(States.UNPAID);
    order1.setId(1);

    System.out.println("--- 发送第1个订单支付事件 ---");
    boolean result = this.sendEvent(Events.PAY, order1);
    System.out.println("> 事件是否发送成功:" + result
            + ",订单编号:" + order1.getId()
            +",当前状态:" + stateMachine.getState().getId());

    // 在子线程中等待6秒后再发送收货事件
    new Thread(() -> {
      try {
        Thread.sleep(6000);
        System.out.println("--- 发送第1个订单收货事件 ---");
        boolean result2 = this.sendEvent(Events.RECEIVE, order1);
        System.out.println("> 事件是否发送成功:" + result2
                + ",订单编号:" + order1.getId()
                +",当前状态:" + stateMachine.getState().getId());
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }).start();

    // 创建第2个订单对象
    Order order2 = new Order();
    order2.setStates(States.UNPAID);
    order2.setId(2);

    System.out.println("--- 发送第2个订单支付事件 ---");
    result = this.sendEvent(Events.PAY, order2);
    System.out.println("> 事件是否发送成功:" + result
            + ",订单编号:" + order2.getId()
            +",当前状态:" + stateMachine.getState().getId());

    System.out.println("--- 发送第2个订单收货事件 ---");
    result = this.sendEvent(Events.RECEIVE, order2);
    System.out.println("> 事件是否发送成功:" + result
            + ",订单编号:" + order2.getId()
            +",当前状态:" + stateMachine.getState().getId());

  }

  /**
   * 发送订单状态转换事件
   * synchronized修饰保证这个方法是线程安全的
   */
  private synchronized boolean sendEvent(Events changeEvent, Order order) {
    boolean result = false;
    try {
      //启动状态机
      stateMachine.start();
      //尝试恢复状态机状态
      stateMachineMemPersister.restore(stateMachine, String.valueOf(order.getId()));
      Message message = MessageBuilder.withPayload(changeEvent)
              .setHeader("order", order).build();
      result = stateMachine.sendEvent(message);
      //持久化状态机状态
      stateMachineMemPersister.persist(stateMachine, String.valueOf(order.getId()));
    } catch (Exception e) {
      System.out.println("操作失败:" + e.getMessage());
    } finally {
      stateMachine.stop();
    }
    return result;
  }
}

(5)查看 Redis 也可以看到里面确实保存了状态机状态数据:
评论

全部评论(0)

回到顶部