Spark - RDD使用详解8(持久化:缓存、检查点)
作者:hangge | 2023-10-18 09:21
十、RDD 缓存(Cache)
1,缓存的使用
(1)缓存是指将 RDD 的计算结果存储在内存中,以便后续的重复使用。通过缓存 RDD,可以避免每次需要使用 RDD 时都重新计算,从而提高 Spark 应用程序的性能。
- 比如下面样例,我们分别实现了单词数量统计和单词分组这两个功能,虽然 reduceRDD 和 groupRDD 都是从 mapRDD 转换而来,但是每次行动前面的所有操作都会从头执行一次。
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 创建RDD val listRDD = sc.makeRDD(List("welcome to hangge.com", "hello world", "hangge.com")) // 对RDD进行转换操作 val wordRDD: RDD[String] = listRDD.flatMap(_.split(" ")) val mapRDD: RDD[(String, Int)] = wordRDD.map { word => { println("*** wordRDD.map ***") (word, 1) } } // 统计单词数量 val reduceRDD = mapRDD.reduceByKey(_+_) reduceRDD.collect().foreach(println) println("-------------------------") // 单词分组 val groupRDD = mapRDD.groupByKey() groupRDD.collect().foreach(println) //关闭 Spark sc.stop()
(2)RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
- 下面样例我们将 mapRDD 缓存到内存中,这样后面再次使用时就不需要重新计算了:
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 创建RDD val listRDD = sc.makeRDD(List("welcome to hangge.com", "hello world", "hangge.com")) // 对RDD进行转换操作 val wordRDD: RDD[String] = listRDD.flatMap(_.split(" ")) val mapRDD: RDD[(String, Int)] = wordRDD.map { word => { println("*** wordRDD.map ***") (word, 1) } } // 将RDD缓存到内存中 mapRDD.cache() // 统计单词数量 val reduceRDD = mapRDD.reduceByKey(_+_) reduceRDD.collect().foreach(println) println("-------------------------") // 单词分组 val groupRDD = mapRDD.groupByKey() groupRDD.collect().foreach(println) //关闭 Spark sc.stop()
2,缓存级别
(1)Spark 提供了不同的缓存级别,可以根据需求选择适合的级别。常见的缓存级别包括:
- MEMORY_ONLY:将 RDD 存储在内存中,但不序列化。如果内存不足,可能会丢失数据。
- MEMORY_AND_DISK:将 RDD 存储在内存中,如果内存不足,则溢出到磁盘。
- MEMORY_ONLY_SER:将 RDD 存储在内存中,并使用序列化方式。
- MEMORY_AND_DISK_SER:将 RDD 存储在内存中,并使用序列化方式,如果内存不足,则溢出到磁盘。
- OFF_HEAP:将 RDD 存储在堆外内存中,通过 DirectByteBuffer 实现,适用于大规模数据集。
(2)使用 persist() 方法可以指定存储级别进行缓存:
注意:
- 默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“2”表示持久化的数据存为两份。
- cache() 底层其实就是调用了 persist(),并设置缓存级别 MEMORY_ONLY。
mapRDD.persist(StorageLevel.MEMORY_AND_DISK_2)
3,缓存的一些注意事项
(1)缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。
(2)Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。
4,缓存级别的选择建议
(1)优先使用 MEMORY_ONLY,纯内存速度最快,而且没有序列化不需要消耗 CPU 进行反序列化操作,缺点就是比较耗内存。
(2)也可以使用 MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快,只是在使用的时候需要消耗CPU进行反序列化。
(3)如果需要进行数据的快速失败恢复,那么就选择带后缀为 _2 的策略,进行数据的备份,这样在失败时,就不需要重新计算了。
(4)能不使用 DISK 相关的策略,就不要使用,因为有的时候,从磁盘读取数据,还不如重新计算一次。
(2)也可以使用 MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快,只是在使用的时候需要消耗CPU进行反序列化。
(3)如果需要进行数据的快速失败恢复,那么就选择带后缀为 _2 的策略,进行数据的备份,这样在失败时,就不需要重新计算了。
(4)能不使用 DISK 相关的策略,就不要使用,因为有的时候,从磁盘读取数据,还不如重新计算一次。
十一、RDD 检查点(CheckPoint)
1,检查点介绍
所谓的检查点其实就是通过将 RDD 中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题或应用程序重新计算时,可以从检查点开始重做血缘,减少了开销。
2,检查点流程
(1)SparkContext 设置 checkpoint 目录,用于存放 checkpoint 的数据;
- 对 RDD 调用 checkpoint 方法,然后它就会被 RDDCheckpointData 对象进行管理,此时这个 RDD 的 checkpoint 状态会被设置为 Initialized
(2)待 RDD 所在的 job 运行结束,会调用 job 中最后一个 RDD 的 doCheckpoint 方法,该方法沿着 RDD 的血缘关系向上查找被 checkpoint() 方法标记过的 RDD,并将其 checkpoint 状态从 Initialized 设置为 CheckpointingInProgress
(3)启动一个单独的 job,来将血缘关系中标记为 CheckpointInProgress 的 RDD 执行 checkpoint 操作,也就是将其数据写入 checkpoint 目录
(4)将 RDD 数据写入 checkpoint 目录之后,会将 RDD 状态改变为 Checkpointed;并且还会改变 RDD 的血缘关系,即会清除掉 RDD 所有依赖的 RDD;最后还会设置其父 RDD 为新创建的 CheckpointRDD
3,检查点的使用样例
(1)设置检查点分为如下两个步骤:
- 指定一个检查点目录,用于存储检查点 RDD 的数据。这个目录可以是本地文件系统的路径,也可以是分布式文件系统(如 HDFS)的路径。
- 使用 checkpoint() 方法将 RDD 设置为检查点。当该方法调用时,Spark 将计算 RDD 的所有转换和操作,并将计算结果写入持久化存储。这会创建一个具有计算结果的新的 RDD,以后的操作将使用该检查点 RDD。
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 设置检查点目录(如HDFS) sc.setCheckpointDir("./checkpoint1") // 创建RDD val listRDD = sc.makeRDD(List("welcome to hangge.com", "hello world", "hangge.com")) // 对RDD进行转换操作 val wordRDD: RDD[String] = listRDD.flatMap(_.split(" ")) val mapRDD: RDD[(String, Int)] = wordRDD.map { word => { println("*** wordRDD.map ***") (word, 1) } } // 设置检查点 mapRDD.checkpoint() // 统计单词数量 val reduceRDD = mapRDD.reduceByKey(_+_) reduceRDD.collect().foreach(println) println("-------------------------") // 单词分组 val groupRDD = mapRDD.groupByKey() groupRDD.collect().foreach(println) //关闭 Spark sc.stop()
(2)执行后可以看到设置的检查点目录下会生成如下一些二进制文件:
(3)控制台输出内容如下:
注意:可以看到到 mapRDD 的转换计算其实执行了两次,其中第二次并不是由于复用造成的,而是因为将 RDD 设置为检查点时,该 RDD 的计算将会被强制执行并写入到持久化存储中。因此,检查点虽然可以避免重复计算,但也会引入额外的存储和计算开销。我们需要在性能和资源利用之间进行权衡,并选择合适的 RDD 进行检查点操作。
附:缓存和检查点区别
1,区别对比
(1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
(2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
2,使用建议
(1)建议对 checkpoint() 的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。
详细说明如下:
- 默认情况下,如果某个 RDD 没有持久化,但是设置了 checkpoint,那么这个时候,本来 Spark 任务已经执行结束了,但是由于中间的 RDD 没有持久化,在进行 checkpoint 的时候想要将这个 RDD 的数据写入外部存储系统的话,就需要重新计算这个 RDD 的数据,再将其 checkpoint 到外部存储系统中。
- 如果对需要 checkpoint 的 rdd 进行了基于磁盘的持久化,那么后面进行 checkpoint 操作时,就会直接从磁盘上读取 rdd 的数据了,就不需要重新再计算一次了,这样效率就高了。
(2)因此,对需要 checkpoint 的 RDD,先执行 persist(StorageLevel.DISK_ONLY)
提示:如果使用基于内存的持久化也是可以的,不过没那个必要。
全部评论(0)