返回 导航

Spark

hangge.com

Spark - RDD使用详解11(累加器、广播变量)

作者:hangge | 2023-11-08 08:10

十三、累加器

1,基本介绍

(1)Spark 累加器(Spark Accumulators)是一种在分布式计算环境下进行累积计算的特殊变量。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge
(2)Spark 累加器的主要特点如下:
  • 累加器只能进行"添加"操作,不能进行读取或更新操作。
  • 累加器的更新操作是在不同节点上并行执行的。
  • 累加器可以在不同的任务中使用,Spark 会自动处理任务之间的并行计算。

2,系统累加器

(1)累加器可以用于计数某个事件的次数,或者用于求和操作。例如下面样例,统计奇数的个数以及所有奇数的和。
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)

// 创建一个计数器累加器
var counter = sc.longAccumulator("counter")
// 创建一个求和累加器
var sum = sc.longAccumulator("sum")

// 创建 RDD
val rdd = sc.makeRDD(List(1,2,3,4,5))
rdd.foreach(
  num => {
    // 使用累加器记录奇数个数,以及所有奇数和
    if (num % 2 == 1) {
      counter.add(1)
      sum.add(num)
    }
  }
)

//获取累加器的值
println("counter = " + counter.value)
println("sum = " + sum.value)

//关闭 Spark
sc.stop()

(2)累加器还可以用于收集数据到一个列表中。例如下面样例,收集 RDD 中奇数元素。
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)

// 创建一个列表收集器累加器
var oddList = sc.collectionAccumulator[Int]("oddList")

// 创建 RDD
val rdd = sc.makeRDD(List(1,2,3,4,5))
rdd.foreach(
  num => {
    // 将奇数添加到累计起中
    if (num % 2 == 1) {
      oddList.add(num)
    }
  }
)

//获取累加器的值
oddList.value.forEach(println)

//关闭 Spark
sc.stop()

3,自定义累加器

(1)我们可以使用 AccumulatorV2 类来自定义累加器。下面示例我们定义了一个用于单词计数的累加器:
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{
  var map : mutable.Map[String, Long] = mutable.Map()

  // 累加器是否为初始状态
  override def isZero: Boolean = {
    map.isEmpty
  }

  // 复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new WordCountAccumulator
  }

  // 重置累加器
  override def reset(): Unit = {
    map.clear()
  }

  // 向累加器中增加数据 (In)
  override def add(word: String): Unit = {
    // 查询 map 中是否存在相同的单词
    // 如果有相同的单词,那么单词的数量加 1
    // 如果没有相同的单词,那么在 map 中增加这个单词
    map(word) = map.getOrElse(word, 0L) + 1L
  }

  // 合并累加器
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]):
  Unit = {
    val map1 = map
    val map2 = other.value
    // 两个 Map 的合并
    map = map1.foldLeft(map2)(
      ( innerMap, kv ) => {
        innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
        innerMap
      }
    ) }

  // 返回累加器的结果 (Out)
  override def value: mutable.Map[String, Long] = map
}

(2)下面是使用这个自定义累加器的样例:
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)

// 创建自定义累加器对象
var wordCountAccumulator = new WordCountAccumulator
// 注册自定义累加器
sc.register(wordCountAccumulator, "wordCountAccumulator")

// 创建 RDD
val rdd = sc.makeRDD(List("a","hangge","a","go","word","go","a"))
// 在RDD上进行操作,并将元素加到累加器中
rdd.foreach(wordCountAccumulator.add)

// 获取累加器的值
val result = wordCountAccumulator.value
println(result)

//关闭 Spark
sc.stop()

十四、广播变量

1,基本介绍

(1)Spark 广播变量(Spark Broadcast Variables)是一种在分布式计算环境中共享和传播大型只读数据的机制。广播变量可以让 Spark 在集群中的所有节点上复制一份数据,并在任务执行期间通过网络传输而不是通过序列化的方式来访问数据,从而提高性能和效率。
(2)广播变量的主要特点如下:
  • 只读性质:广播变量一旦被创建,其值不能被修改。
  • 共享性质:广播变量可以在集群中的不同节点之间共享。
  • 内存复制:广播变量会将数据复制到每个节点的内存中,而不是通过序列化方式传输数据。
(3)通过下图我们可以深入理解下广播变量:
  • 左边的代码是一个我们经常使用的 map 算子的代码,map 算子中执行对每一个元素乘以一个固定变量的操作,此时这个固定的变量属于外部变量。
    • 默认情况下,算子函数内,使用到的外部变量,会被拷贝到执行这个算子的每一个 task 中。
  • 中间的 MapTask,这些都是 map 算子产生的 task,也就是说这个外部变量会被拷贝到每一个 task 中。如果这个外部变量是一个集合,集合中有上亿条数据,这个网络传输就会很耗时,而且在每个 task 上,占用的内存空间,也会很大。
    • 大家可以想象一个极端情况,如果 map 算子有 10 task,恰好这 10 task 还都在一个 worker 节点上,那么这个时候,map 算子使用的外部变量就会在这个 worker 节点上保存 10 份,这样就很占用内存了。
  • 右边的代码算子函数中使用的外部变量是广播变量,那么每个变量只会拷贝一份到每个节点上。节点上所有的 task 都会共享这一份变量,就可以减少网络传输消耗的时间,以及减少内存占用了。

2,使用样例

(1)下面样例中我们定义了一个广播变量 broadcastData,里面定义了各个 key 对应的乘法系数。然后,我们创建了一个 RDD,并使用 map 操作将 RDD 中的元素与广播变量中的对应 key 的元素相乘。
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)

// 声明广播变量
val data = List(("a",1), ("b", 10), ("c", 100))
val broadcastData = sc.broadcast(data)

// 创建 RDD
val rdd = sc.makeRDD(List(("a",2), ("b", 3), ("c", 4), ("a", 5)))
// 对RDD进行转换
val resultRDD = rdd.map{
  case (key, num) => {
    var num2 = 0
    // 使用广播变量
    for ((k, v) <- broadcastData.value) {
      if (k == key) {
        num2 = v
      }
    }
    (key, num*num2)
  }
}

// 打印结果
resultRDD.collect().foreach(println)

//关闭 Spark
sc.stop()

(2)运行结果如下:
评论

全部评论(0)

回到顶部