Flink - Savepoint使用详解2(手工触发Savepoint、从Savepoint进行恢复)
作者:hangge | 2025-04-01 08:38
一、手工触发 Savepoint
1,准备任务 jar 包
(1)首先我们准备一个有状态的单词计数案例,具体代码可以参考我之前写的文章:
(2)然后将其打包,并将生成的 jar 包上传到集群服务器中。
2,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后执行如下命令向集群中提交此任务。
bin/flink run \ -m yarn-cluster \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
-
启动后会看到任务 ID 和对应的 yarn applicationid,这个后面手工触发 Savepoint 需要用到:

a b a
(4)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。

(5)接着我们执行如下命令手工触发 savepoint。注意:flink 任务 id 和对应的 yarn applicationid 根据实际情况进行修改。
bin/flink savepoint \ 9c5459aeba3cf63d88be7473ba83a260 \ hdfs://192.168.121.128:9000/flink/savepoint \ -yid application_1733037326153_0002
(6)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5

二、从 Savepoint 进行恢复(正常恢复)
(1)首先我们停止当前的任务。

(2)然后执行如下命令尝试使用之前生成的 savepoint 数据来重启恢复任务,注意这里需要通过 -s 参数指定 savepoint 的数据目录。
提示:从 Savepoint 进行恢复时使用的命令和前面我们讲的手动从 checkpoint 恢复的命令是一样的。
其实这个命令本来就是 Savepoint 提供的,只不过也是支持基于 checkpoint 的数据进行恢复。
bin/flink run \ -m yarn-cluster \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
(3)当任务正常启动之后,在 socket 中模拟产生数据:
a
(4)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。

全部评论(0)