返回 导航

大数据

hangge.com

Flink - Savepoint使用详解2(手工触发Savepoint、从Savepoint进行恢复)

作者:hangge | 2025-04-01 08:38

一、手工触发 Savepoint

1,准备任务 jar 包

(1)首先我们准备一个有状态的单词计数案例,具体代码可以参考我之前写的文章:
(2)然后将其打包,并将生成的 jar 包上传到集群服务器中。

2,运行测试

(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后执行如下命令向集群中提交此任务。
bin/flink run \
 -m yarn-cluster \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(3)当任务正常启动之后,在 socket 中模拟产生数据:
a b
a

(4)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。

(5)接着我们执行如下命令手工触发 savepoint注意flink 任务 id 和对应的 yarn applicationid 根据实际情况进行修改。
bin/flink savepoint \
  9c5459aeba3cf63d88be7473ba83a260 \
  hdfs://192.168.121.128:9000/flink/savepoint \
  -yid application_1733037326153_0002

(6)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5

二、从 Savepoint 进行恢复(正常恢复)

(1)首先我们停止当前的任务。

(2)然后执行如下命令尝试使用之前生成的 savepoint 数据来重启恢复任务,注意这里需要通过 -s 参数指定 savepoint 的数据目录。
提示:从 Savepoint 进行恢复时使用的命令和前面我们讲的手动从 checkpoint 恢复的命令是一样的。 其实这个命令本来就是 Savepoint 提供的,只不过也是支持基于 checkpoint 的数据进行恢复。
bin/flink run \
 -m yarn-cluster \
 -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(3)当任务正常启动之后,在 socket 中模拟产生数据:
a

(4)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。
评论

全部评论(0)

回到顶部