返回 导航

Spark

hangge.com

Spark - RDD使用详解8(持久化:缓存、检查点)

作者:hangge | 2023-10-18 09:21

十、RDD 缓存(Cache)

1,缓存的使用

(1)缓存是指将 RDD 的计算结果存储在内存中,以便后续的重复使用。通过缓存 RDD,可以避免每次需要使用 RDD 时都重新计算,从而提高 Spark 应用程序的性能。
  • 比如下面样例,我们分别实现了单词数量统计和单词分组这两个功能,虽然 reduceRDDgroupRDD 都是从 mapRDD 转换而来,但是每次行动前面的所有操作都会从头执行一次。
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)

// 创建RDD
val listRDD = sc.makeRDD(List("welcome to hangge.com", "hello world", "hangge.com"))

// 对RDD进行转换操作
val wordRDD: RDD[String] = listRDD.flatMap(_.split(" "))
val mapRDD: RDD[(String, Int)] = wordRDD.map {
  word => {
    println("*** wordRDD.map ***")
    (word, 1)
  }
}

// 统计单词数量
val reduceRDD = mapRDD.reduceByKey(_+_)
reduceRDD.collect().foreach(println)
println("-------------------------")

// 单词分组
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)

//关闭 Spark
sc.stop()

(2)RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
  • 下面样例我们将 mapRDD 缓存到内存中,这样后面再次使用时就不需要重新计算了:
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)

// 创建RDD
val listRDD = sc.makeRDD(List("welcome to hangge.com", "hello world", "hangge.com"))

// 对RDD进行转换操作
val wordRDD: RDD[String] = listRDD.flatMap(_.split(" "))
val mapRDD: RDD[(String, Int)] = wordRDD.map {
  word => {
    println("*** wordRDD.map ***")
    (word, 1)
  }
}

// 将RDD缓存到内存中
mapRDD.cache()

// 统计单词数量
val reduceRDD = mapRDD.reduceByKey(_+_)
reduceRDD.collect().foreach(println)
println("-------------------------")

// 单词分组
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)

//关闭 Spark
sc.stop()

2,缓存级别

(1)Spark 提供了不同的缓存级别,可以根据需求选择适合的级别。常见的缓存级别包括:
  • MEMORY_ONLY:将 RDD 存储在内存中,但不序列化。如果内存不足,可能会丢失数据。
  • MEMORY_AND_DISK:将 RDD 存储在内存中,如果内存不足,则溢出到磁盘。
  • MEMORY_ONLY_SER:将 RDD 存储在内存中,并使用序列化方式。
  • MEMORY_AND_DISK_SER:将 RDD 存储在内存中,并使用序列化方式,如果内存不足,则溢出到磁盘。
  • OFF_HEAP:将 RDD 存储在堆外内存中,通过 DirectByteBuffer 实现,适用于大规模数据集。

(2)使用 persist() 方法可以指定存储级别进行缓存:
注意:
  • 默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“2”表示持久化的数据存为两份。
  • cache() 底层其实就是调用了 persist(),并设置缓存级别 MEMORY_ONLY
mapRDD.persist(StorageLevel.MEMORY_AND_DISK_2)

3,缓存的一些注意事项

(1)缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition
(2)Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persistcache

4,缓存级别的选择建议

(1)优先使用 MEMORY_ONLY,纯内存速度最快,而且没有序列化不需要消耗 CPU 进行反序列化操作,缺点就是比较耗内存。
(2)也可以使用 MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快,只是在使用的时候需要消耗CPU进行反序列化。
(3)如果需要进行数据的快速失败恢复,那么就选择带后缀为 _2 的策略,进行数据的备份,这样在失败时,就不需要重新计算了。
(4)能不使用 DISK 相关的策略,就不要使用,因为有的时候,从磁盘读取数据,还不如重新计算一次。

十一、RDD 检查点(CheckPoint)

1,检查点介绍

    所谓的检查点其实就是通过将 RDD 中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题或应用程序重新计算时,可以从检查点开始重做血缘,减少了开销。

2,检查点流程

(1)SparkContext 设置 checkpoint 目录,用于存放 checkpoint 的数据;
  • RDD 调用 checkpoint 方法,然后它就会被 RDDCheckpointData 对象进行管理,此时这个 RDD checkpoint 状态会被设置为 Initialized
(2)待 RDD 所在的 job 运行结束,会调用 job 中最后一个 RDD doCheckpoint 方法,该方法沿着 RDD 的血缘关系向上查找被 checkpoint() 方法标记过的 RDD,并将其 checkpoint 状态从 Initialized 设置为 CheckpointingInProgress
(3)启动一个单独的 job,来将血缘关系中标记为 CheckpointInProgress RDD 执行 checkpoint 操作,也就是将其数据写入 checkpoint 目录
(4)将 RDD 数据写入 checkpoint 目录之后,会将 RDD 状态改变为 Checkpointed;并且还会改变 RDD 的血缘关系,即会清除掉 RDD 所有依赖的 RDD;最后还会设置其父 RDD 为新创建的 CheckpointRDD

3,检查点的使用样例

(1)设置检查点分为如下两个步骤:
  • 指定一个检查点目录,用于存储检查点 RDD 的数据。这个目录可以是本地文件系统的路径,也可以是分布式文件系统(如 HDFS)的路径。
  • 使用 checkpoint() 方法将 RDD 设置为检查点。当该方法调用时,Spark 将计算 RDD 的所有转换和操作,并将计算结果写入持久化存储。这会创建一个具有计算结果的新的 RDD,以后的操作将使用该检查点 RDD
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)

// 设置检查点目录(如HDFS)
sc.setCheckpointDir("./checkpoint1")

// 创建RDD
val listRDD = sc.makeRDD(List("welcome to hangge.com", "hello world", "hangge.com"))

// 对RDD进行转换操作
val wordRDD: RDD[String] = listRDD.flatMap(_.split(" "))
val mapRDD: RDD[(String, Int)] = wordRDD.map {
  word => {
    println("*** wordRDD.map ***")
    (word, 1)
  }
}

// 设置检查点
mapRDD.checkpoint()

// 统计单词数量
val reduceRDD = mapRDD.reduceByKey(_+_)
reduceRDD.collect().foreach(println)
println("-------------------------")

// 单词分组
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)

//关闭 Spark
sc.stop()

(2)执行后可以看到设置的检查点目录下会生成如下一些二进制文件:

(3)控制台输出内容如下:
注意:可以看到到 mapRDD 的转换计算其实执行了两次,其中第二次并不是由于复用造成的,而是因为将 RDD 设置为检查点时,该 RDD 的计算将会被强制执行并写入到持久化存储中。因此,检查点虽然可以避免重复计算,但也会引入额外的存储和计算开销。我们需要在性能和资源利用之间进行权衡,并选择合适的 RDD 进行检查点操作。

附:缓存和检查点区别

1,区别对比

(1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
(2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。

2,使用建议

(1)建议对 checkpoint()RDD 使用 Cache 缓存,这样 checkpointjob 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD
详细说明如下:
  • 默认情况下,如果某个 RDD 没有持久化,但是设置了 checkpoint,那么这个时候,本来 Spark 任务已经执行结束了,但是由于中间的 RDD 没有持久化,在进行 checkpoint 的时候想要将这个 RDD 的数据写入外部存储系统的话,就需要重新计算这个 RDD 的数据,再将其 checkpoint 到外部存储系统中。
  • 如果对需要 checkpoint rdd 进行了基于磁盘的持久化,那么后面进行 checkpoint 操作时,就会直接从磁盘上读取 rdd 的数据了,就不需要重新再计算一次了,这样效率就高了。

(2)因此,对需要 checkpoint RDD,先执行 persist(StorageLevel.DISK_ONLY)
提示:如果使用基于内存的持久化也是可以的,不过没那个必要。
评论

全部评论(0)

回到顶部