Flink - Savepoint使用详解1(基本介绍、功能实现的前提条件)
作者:hangge | 2025-03-31 08:35
一、基本介绍
1,什么是 Savepoint?
(1)Checkpoint 是为了保证应用在出现故障时可以顺利重启恢复。而 Savepoint 是为了有计划的备份任务,实现任务升级后可恢复。
- 任务升级主要包括:增减并行度、调整业务逻辑、以及升级 Flink 版本时的任务迁移。
- Flink 通过 Savepoint 功能可以做到程序升级后,继续从升级前的那个点开始执行计算,保证数据不中断。
(2)Savepoint 会生成全局,一致性快照,可以保存数据源 offset,operator 操作状态等信息,可以从应用在过去任意做了 savepoint 的时刻开始继续消费。
- Savepoint 的生成算法和 Checkpoint 是完全一样的,所以可以把 Savepoint 认为是包含了一些额外元数据的 Checkpoint,所以 Savepoint 本质上是特殊的 Checkpoint。
(3)Savepoint 和 Checkpoint 可以同时执行,互不影响,Flink 不会因为正在执行 Checkpoint 而推迟 Savepoint 的执行。
2,Checkpoint 与 Savepoint 对比
| Checkpoint | Savepoint | |
| 中文翻译 | 检查点/快照 | 保存点 |
| 触发方式 | 自动触发,自动恢复 | 手工触发,手工恢复 |
| 作用 | 实现任务故障恢复 | 有计划的备份,保证任务升级后再恢复 |
| 特点 | 轻量级 支持增量快照(RocksDB) |
重量级 标准格式存储,允许代码或 配置出现变化 |
(1)中文翻译:
- Checkpoint 可以翻译为检查点或者快照。
- Savepoint 可以翻译为保存点。
(2)触发方式:
- Checkpoint 是由 JobManager 定时触发快照并自动清理,不需要用户干预,当任务故障重启时自动恢复。
- Savepoint 面向用户,完全根据用户的需要触发与清理,恢复的时候也是根据需求进行恢复。
(3)作用:
- Checkpoint 主要是为了实现任务故障恢复的,他的侧重点是容错,当 Flink 作业意外失败,重启时可以从之前生成的 CheckPoint 自动恢复运行,不影响作业逻辑的准确性。
- SavePoint 侧重点是维护,当 Flink 作业需要在人工干预下手动重启、升级、或者迁移时,先将状态整体写入可靠存储,维护完毕之后再从 SavePoint 恢复,他属于有计划的备份。
(4)特点:
- Checkpoint 属于轻量级的快照,因为 Checkpoint 的频率往往比较高,所以 Checkpoint 的存储格式非常轻量级,但作为权衡牺牲了一切可移植的东西,例如:不保证改变并行度和升级的兼容性。
- Savepoint 属于重量级的快照,他会以二进制的形式存储所有状态数据和元数据,执行起来比较慢而且贵,但是能够保证程序的可移植性 ,例如并行度改变或代码升级之后,仍然能正常恢复。
- Checkpoint 是支持增量快照的(如果状态数据存储在 RocksDB 里面),对于超大状态的 作业而言可以降低写入成本。
- Savepoint 并不会连续自动触发,所以不支持增量,只支持全量。
二、Savepoint 保证程序可移植性的前提条件
(1)Savepoint 功能实现的前提条件是需要保证任务中所有有状态的算子都配置好下面这两个参数:
- 第一个参数是:算子唯一标识。
- 第二个参数是:算子最大并行度,这个参数只针对使用了 keyed State 的算子。
(2)这两个参数会被固化到 Savepoint 数据中,不可更改,如果新任务中这两个参数发生了变化,就无法从之前生成的 Savepoint 数据中启动并恢复数据了,只能选择丢弃之前的状态从头开始运行。
1,算子唯一标识
(1)默认情况下 Flink 会给每个算子分配一个唯一标识。但是这个标识是根据前面算子的标识并且结合一些规则生成的,这也就意味着任何一个前置算子发生改变都会导致该算子的标识发生变化 。
- 例如:我们添加或者删除一个算子,这样后面算子的唯一标识就变了,就不可控了。
(2)只要任务中的算子唯一标识发生了变化,Savepoint 保存的状态数据基本上就无法用来恢复了,因为之前保存的算子标识和现在最新的算子标识不一样了。
- 咱们前面说过,Savepoint 会以二进制的形式存储所有状态数据和元数据,这里的算子唯一标识就属于元数据中的内容。当 Flink 任务从 Savepoint 启动时,会利用算子的唯一标识将 Savepoint 保存的状态映射到新任务对应的算子中,只有当新任务的算子唯一标识和 Savepoint 数据中保存的算子标识相同时,状态才能顺利恢复。
(3)所以说如果我们没有给有状态的算子手工设置唯一标识,那么在任务升级时就会受到很多限制。以下图为例:
- 在最开始的任务版本,里面有 Source、Map 和 Sink 这三个组件,假设这三个组件都是有状态的,我们没有给这些组件手工设置唯一标识,使用的是默认的自动生成的。这里的唯一标识其实就是 uid。
- 假设 source 自动生成的唯一标识是 uid-001,Map 自动生成的唯一标识是 uid-002,Sink 自动生成的唯一标识是 uid-003。
- 当这个任务运行了一段时间之后,我们的业务逻辑发生了变化,所以对代码做了一些修改,增加了一个 Flatmap 组件。
- 此时还是使用的默认生成的唯一标识,那此时 source 自动生成的唯一标识可能还是 uid-001,map 自动生成的唯一标识也还是 uid-002,但是 flatmap 自动生成的唯一标识可能是 uid-003,最后 sink 自动生成的唯一标识就可能是 uid-004 了。
- 这样新任务中 sink 的唯一标识和之前任务中 sink 的唯一标识就不一样了,那么再基于之前任务生成的 savepoint 数据就会导致无法恢复了。

(4)为了能够在任务的不同版本之间顺利升级,我们需要通过 uid(…) 方法手动的给算子设置 uid。类似下面代码:
- 在 DataSouce、中间的转换算子、DataSink 上面都可以设置 uid。
source.uid(...) transform.uid(...) sink.uid(...)
(5)其实也没必要给所有的组件都设置 uid,最重要的是给包含了状态的组件设置 uid,没有状态的组件也不会涉及到数据恢复,就没必要设置了。
- 例如下面代码中的设置,source 和 map 中维护了状态,所以需要设置 uid,其它的就不用设置了,让程序自动生成即可。
- 此时我们手工指定 uid 之后,后期就算在任务中新增了一个组件,之前的组件的 uid 也不会变化了,任务基于之前生成的 Savepoint 数据启动的时候依然是可以恢复数据的。
val dataStream = env.addSource(new StatefulSource())
.uid("source-uid") //有状态的source,建议手工设置uid
.shuffle() //无状态的算子,自动生成uid即可
.map(new StatefulMapper())
.uid("map-uid") //有状态的map,建议手工设置uid
.print() //无状态的sink,自动生成uid即可
2,算子最大并行度
(1)我们知道,Flink 中 Keyed State 类型的状态数据在恢复时,是按照 KeyGroup 为单位恢复的,每个 KeyGroup 中包含一部分 key 的数据。
- 针对 Keyed State,状态在扩缩容的时候会以 KeyGroup 为单位进行重新分配。
(2)KeyGroup 的个数等于算子的最大并行度。注意:算子的最大并行度并不是算子的并行度,
- 算子的最大并行度是通过 setMaxParallelism() 方法设置的,
- 算子的并行度是通过 setParallelism() 方法设置的。
(3)当我们设置的算子并行度大于算子最大并行度时,任务在重启的时候有些并行度就分配不到 KeyGroup 了,这样会导致 Flink 任务无法从 Savepoint 恢复数据。
注意:此时也是无法从 Checkpoint 中恢复数据的。
- 设置全局算子最大并行度:env.setMaxParallelism()
- 设置某个算子最大并行度:.map(..).setMaxParallelism()
(5)如果我们没有手动设置最大并行度的话,算子最大并行度默认是 128,最大是 32768。
- 如果 Flink 任务中算子的并行度比较小的时候,则算子最大并行度默认就是 128。
- 如果 Flink 任务中算子的并行度比较大,则会按照 KeyGroupRangeAssignment 类的 computeDefaultMaxParallelism 方法里相应公式公式生成对应的值,这个值不是固定的。
- 所以说如果我们想要保证 Savepoint 的可移植性,那么最好是手工设置一个固定的算子最大并行度。
附:算子最大并行度注意事项
1,问题描述
(1)在工作中还遇到过一个关于算子最大并行度的问题,当时我们在计算某一个业务指标的时候,这个任务中也用到了基于 keyed state 类型的状态,这个业务指标前期数据量比较小,所以最开始给这个任务设置的全局并行度为 20,也没有单独给这个任务中的算子设置最大并行度,根据前面的分析,此时默认生成的算子最大并行度就是 128 了。
(2)后期随着平台用户规模的增长,这个业务的数据量呈指数级增长,使用之前的并行度来处理增长后的数据规模就有点力不从心了,导致数据出现了积压,我们尝试调整任务的并行度,当把任务的并行度调整为 128 以上时,发现任务无法从 checkpoint 和 savepoint 进行恢复。这就是所谓的任务并行度调不上去了。
2,原因分析
这是因为我们前面分析的,当算子并行度大于算子最大并行度时,任务在重启的时候有些并行度就分配不到 KeyGroup 了,这样会导致 Flink 任务无法从 Checkpoint 或者 Savepoint 恢复数据。
3,解决方案
(1)此时想要提高计算能力,只有一种方法,那就是放弃状态中保存的数据,不从状态中恢复,直接单独启动这个任务,这样是可以提高并行度的。
(2)其实合理的一点方案是这样的:我们在开发基于 keyed state 类型的有状态的任务的时候,需要提前预估一下这个任务后期可能处理的数据规模会达到哪种级别,提前设置一个合适的算子最大并行度。这样在前期数据量小的时候,我们可以给任务设置一个比较小的并行度,也不浪费资源,后期数据量上来之后,再调整为一个比较大的并行度。
(3)所以针对一个 keyed state 类型的有状态的 Flink 任务,它未来能扩展到的最大并行度其实取决于这个任务第一次启动时设置的算子最大并行度。
全部评论(0)