Flink - Savepoint使用详解3(无法恢复的情况以及解决方案)
作者:hangge | 2025-04-02 09:02
我在之前的文章中讲到在某些特殊情况下会导致任务无法从 Savepoint 中恢复(点击查看)。下面来针对两个比较常见的故障场景进行分析:
(3)然后执行如下命令向集群中提交此任务。
(4)当任务正常启动之后,在 socket 中模拟产生数据:
(5)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。
(7)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
(8)然后我们停止当前的任务:

(10)重新编译打包,并将新的 jar 包上传到集群。然后执行如下命令基于之前生成的 savepoint 数据进行恢复。
(11)此时发现任务提交上去之后会自动失败。这是因为不能把 savepoint 中的状态数据映射到 uid 发送了变化的算子中。
(2)然后我们成 jar 包还是按照上面步骤提交任务,产生一些测试数据,接着手工触发 savepoint,然后取消任务。假设这次保存路径如下:
(3)接着我们再修改代码,增加一个 map 算子:
(4)重新编译打包,并将新的 jar 包上传到集群。然后执行如下命令基于之前生成的 savepoint 数据进行恢复。
(5)此时发现任务可以正常启动。查看任务界面中的信息,可以看到任务是基于之前的 savepoint 数据进行恢复的。
(7)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。
(3)然后执行如下命令向集群中提交此任务。
(4)当任务正常启动之后,在 socket 中模拟产生数据:
(5)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。
(7)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
(8)然后我们停止当前的任务:

(10)此时发现任务提交上去之后会自动失败。这是因为改变并行度时,也会影响算子默认生成的 uid。

(2)然后我们成 jar 包还是按照上面步骤提交任务,产生一些测试数据,接着手工触发 savepoint,然后取消任务。假设这次保存路径如下:
(3)接着执行如下命令基于之前生成的 savepoint 数据进行恢复。同样的这里我们还通过 -p 指定全局并行度为 2。
(4)此时发现任务可以正常启动。查看任务界面中的信息,可以看到任务是基于之前的 savepoint 数据进行恢复的。

(7)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。
- 故障情况1:未手工设置 uid,重启时任务中增加了新的算子
- 故障情况2:未手工设置 uid,重启时算子并行度发生了变化
一、故障情况1:未手工设置 uid,重启时任务中增加了新的算子
1,故障现象
(1)我们还是以前文的有状态的单词计数案例作为演示,具体代码见上文:
(2)接着我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(3)然后执行如下命令向集群中提交此任务。
bin/flink run \ -m yarn-cluster \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
-
启动后会看到任务 ID 和对应的 yarn applicationid,这个后面手工触发 Savepoint 需要用到:

a b a
(5)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。

(6)接着我们执行如下命令手工触发 savepoint。注意:flink 任务 id 和对应的 yarn applicationid 根据实际情况进行修改。
bin/flink savepoint \ 9c5459aeba3cf63d88be7473ba83a260 \ hdfs://192.168.121.128:9000/flink/savepoint \ -yid application_1733037326153_0002
(7)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5


(9)接着我们修改代码,在任务中增加一个算子,其他代码不变。
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.map(tup=>(tup._1,tup._2))
.keyBy(_._1)
(10)重新编译打包,并将新的 jar 包上传到集群。然后执行如下命令基于之前生成的 savepoint 数据进行恢复。
bin/flink run \ -m yarn-cluster \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
(11)此时发现任务提交上去之后会自动失败。这是因为不能把 savepoint 中的状态数据映射到 uid 发送了变化的算子中。

(12)如果我们想忽略这个问题,可以指定 --allowNonRestoredState,这样会忽略掉无法映射的状态数据,强制启动。
bin/flink run \ -m yarn-cluster \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ --allowNonRestoredState \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
2,解决方案
(1)想要解决这个问题,我们就需要手工设置算子的 uid,至少是要指定有状态的算子的 uid。首先我们将代码还原成最初状态,然后在有状态的 map 算子后面设置 uid。
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object WordCountStateWithCheckpointDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//开启Checkpoint
env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次
//在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置Checkpoint后的状态数据的存储位置
// 设置存储位置(true表示增量快照)
env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints",
true))
val text = env.socketTextStream("192.168.121.128", 9999)
import org.apache.flink.api.scala._
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
//声明一个ValueState类型的状态变量,存储单词出现的总次数
private var countState: ValueState[Int] = _
/**
* 任务初始化的时候这个方法执行一次
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//注册状态
val valueStateDesc = new ValueStateDescriptor[Int](
"countState",//指定状态名称
classOf[Int]//指定状态中存储的数据类型
)
countState = getRuntimeContext.getState(valueStateDesc)
}
override def map(value: (String, Int)): (String,Int) = {
//从状态中获取这个key之前出现的次数
var lastNum = countState.value()
val currNum = value._2
//如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
if(lastNum == null){
lastNum = 0
}
//汇总出现的次数
val sum = lastNum+currNum
//更新状态
countState.update(sum)
//返回单词及单词出现的总次数
(value._1,sum)
}
}).uid("vs_map001")
.print()
env.execute("WordCountStateWithCheckpointDemo")
}
}
(2)然后我们成 jar 包还是按照上面步骤提交任务,产生一些测试数据,接着手工触发 savepoint,然后取消任务。假设这次保存路径如下:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03
(3)接着我们再修改代码,增加一个 map 算子:
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.map(tup=>(tup._1,tup._2))
.keyBy(_._1)
(4)重新编译打包,并将新的 jar 包上传到集群。然后执行如下命令基于之前生成的 savepoint 数据进行恢复。
bin/flink run \ -m yarn-cluster \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
(5)此时发现任务可以正常启动。查看任务界面中的信息,可以看到任务是基于之前的 savepoint 数据进行恢复的。

(6)在 socket 中模拟产生数据:
a
(7)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。

二、故障情况2:未手工设置 uid,重启时算子并行度发生了变化
1,故障现象
(1)我们还是以前文的有状态的单词计数案例作为演示,具体代码见上文:
(2)接着我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(3)然后执行如下命令向集群中提交此任务。
bin/flink run \ -m yarn-cluster \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
-
启动后会看到任务 ID 和对应的 yarn applicationid,这个后面手工触发 Savepoint 需要用到:

a b a
(5)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。

(6)接着我们执行如下命令手工触发 savepoint。注意:flink 任务 id 和对应的 yarn applicationid 根据实际情况进行修改。
bin/flink savepoint \ 9c5459aeba3cf63d88be7473ba83a260 \ hdfs://192.168.121.128:9000/flink/savepoint \ -yid application_1733037326153_0002
(7)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5


(9)接着执行如下命令基于之前生成的 savepoint 数据进行恢复,特别的是这里我们还通过 -p 指定全局并行度为 2。
bin/flink run \ -m yarn-cluster \ -p 2 \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(11)如果我们想忽略这个问题,可以指定 --allowNonRestoredState,这样会忽略掉无法映射的状态数据,强制启动。
bin/flink run \ -m yarn-cluster \ -p 2 \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ --allowNonRestoredState \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
2,解决方案
(1)所以想要支持在恢复状态的时候修改并行度,需要给有状态的算子手工设置 uid。
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object WordCountStateWithCheckpointDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//开启Checkpoint
env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次
//在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置Checkpoint后的状态数据的存储位置
// 设置存储位置(true表示增量快照)
env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints",
true))
val text = env.socketTextStream("192.168.121.128", 9999)
import org.apache.flink.api.scala._
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
//声明一个ValueState类型的状态变量,存储单词出现的总次数
private var countState: ValueState[Int] = _
/**
* 任务初始化的时候这个方法执行一次
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//注册状态
val valueStateDesc = new ValueStateDescriptor[Int](
"countState",//指定状态名称
classOf[Int]//指定状态中存储的数据类型
)
countState = getRuntimeContext.getState(valueStateDesc)
}
override def map(value: (String, Int)): (String,Int) = {
//从状态中获取这个key之前出现的次数
var lastNum = countState.value()
val currNum = value._2
//如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
if(lastNum == null){
lastNum = 0
}
//汇总出现的次数
val sum = lastNum+currNum
//更新状态
countState.update(sum)
//返回单词及单词出现的总次数
(value._1,sum)
}
}).uid("vs_map001")
.print()
env.execute("WordCountStateWithCheckpointDemo")
}
}
(2)然后我们成 jar 包还是按照上面步骤提交任务,产生一些测试数据,接着手工触发 savepoint,然后取消任务。假设这次保存路径如下:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03
(3)接着执行如下命令基于之前生成的 savepoint 数据进行恢复。同样的这里我们还通过 -p 指定全局并行度为 2。
bin/flink run \ -m yarn-cluster \ -p 2 \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
(4)此时发现任务可以正常启动。查看任务界面中的信息,可以看到任务是基于之前的 savepoint 数据进行恢复的。

(5)并且查看任务界面,可以看到并行度变成 2 了。

(6)在 socket 中模拟产生数据:
a
(7)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。

全部评论(0)