返回 导航

Spark

hangge.com

Spark - RDD使用详解2(转换算子1:Value类型)

作者:hangge | 2023-09-25 09:10

四、Value 类型的转换算子

1,map

(1)该函数将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
def map[U: ClassTag](f: T => U): RDD[U]

(2)下面是一个简单的使用样例:
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)

// 创建RDD
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

// 使用map进行值转换
val rdd2: RDD[Int] = rdd1.map(
  num => {
    num * num
  }
)
    
// 使用map进行类型转换
val rdd3: RDD[String] = rdd1.map(
  num => {
    "数字:" + num
  }
)

// 打印结果
rdd2.collect().foreach(println)
rdd3.collect().foreach(println)

//关闭 Spark
sc.stop()
                     

2,mapPartitions

(1)该函数将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
def mapPartitions[U: ClassTag](
 f: Iterator[T] => Iterator[U],
 preservesPartitioning: Boolean = false): RDD[U]
map 和 mapPartitions 的区别:
  • 数据处理角度Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
  • 功能的角度Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据。
  • 性能的角度Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。

(2)下面样例使得每个分区只保留偶数数据:
// 创建RDD(指定分区数为3个)
// 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10)
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3)

// 每个分区只保留偶数数据
val rdd2: RDD[Int] = rdd1.mapPartitions(
  datas => {
    datas.filter(_%2==0)
  }
)

// 打印结果
// 2
// 4
// 6
// 8
// 10
rdd2.collect().foreach(println)

(3)下面样例获取每个数据分区的最大值:
// 创建RDD(指定分区数为3个)
// 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10)
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3)

// 对每个分区进行操作,获取最大值
val rdd2: RDD[Int] = rdd1.mapPartitions(partition => Iterator(partition.max))

// 打印结果
// 3
// 6
// 10
rdd2.collect().foreach(println)

3,mapPartitionsWithIndex

(1)该函数将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
def mapPartitionsWithIndex[U: ClassTag](
 f: (Int, Iterator[T]) => Iterator[U],
 preservesPartitioning: Boolean = false): RDD[U]

(2)下面代码对每个分区进行操作,计算元素之和,并附带分区索引:
// 创建RDD(指定分区数为3个)
// 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10)
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3)

// 对每个分区进行操作,计算元素之和,并附带分区索引
val rdd2 = rdd1.mapPartitionsWithIndex { (index, partition) =>
  Iterator((index, partition.sum))
}

// 打印结果
// (0,6)
// (1,15)
// (2,34)
rdd2.collect().foreach(println)

4,flatMap

(1)该函数将将数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

(2)下面是一个简单的使用样例:
// 创建RDD
val rdd1 = sc.makeRDD(List(List(1,2),List(3,4)))

// 扁平化后进行映射
val rdd2 =  rdd1.flatMap(list => list)

// 打印结果
rdd2.collect().foreach(println)

(3)下面是对更复杂的嵌套列表进行扁平化处理:
// 创建RDD
val rdd1 = sc.makeRDD(List(List(1,2),3,List(4,5)))

// 扁平化嵌套列表
val rdd2 = rdd1.flatMap {
  case list: List[_] => list
  case element => List(element)
}

// 打印结果
rdd2.collect().foreach(println)

5,glom

(1)该函数将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。这个函数对于需要在分区级别上进行操作的情况非常有用。
def glom(): RDD[Array[T]]

(2)下面样例将将每个分区的数据合并成一个数组,最后打印出来:
// 创建RDD(指定分区数为3个)
// 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10)
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3)

// 使用glom函数获取每个分区的数据
val rdd2:RDD[Array[Int]] = rdd1.glom()

// 打印每个分区的数据
rdd2.collect().foreach { partition =>
  println(partition.mkString(", "))
}

(3)下面代码将每个分区的数据合并成一个数组后,在每个分区中查找最大值:
// 创建RDD(指定分区数为3个)
// 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10)
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3)

// 使用glom函数获取每个分区的数据
val rdd2:RDD[Array[Int]] = rdd1.glom()

// 在每个分区中查找最大值
val rdd3 = rdd2.map { partition => partition.max }

rdd3.collect().foreach(println)

6,groupBy

(1)该函数将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,这样的操作称之为 shuffle
注意:同一个组的数据在一个分区中,但是并不是说一个分区中只有一个组。
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

(2)下面样例将数字列表按奇偶数进行分组:
// 创建RDD
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))

// 按奇偶数进行分组
val rdd2:RDD[(Int, Iterable[Int])] = rdd1.groupBy(_%2)

rdd2.collect().foreach(println)

(3)下面代码样例将单词按照单词首写字母进行分组:
// 创建RDD
val rdd1 = sc.makeRDD(List("hangge", "hello", "apple", "google", "baidu", "good"))

// 按奇偶数进行分组
val rdd2:RDD[(Char, Iterable[String])] = rdd1.groupBy{ word =>
  word.charAt(0)
}

// 打印结果
rdd2.collect().foreach { case (firstLetter, words) =>
  println(s"字母'$firstLetter'开头的单词有: ${words.mkString(", ")}")
}

7,filter

(1)该函数将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
注意:当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
def filter(f: T => Boolean): RDD[T]

(2)下面代码过滤出大于等于5的数字:
// 创建RDD
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))

// 过滤出大于等于5的数字
val rdd2 = rdd1.filter { num => num >= 5 }

// 打印结果
rdd2.collect().foreach(println)

8,sample

(1)该函数根据指定的规则从数据集中抽取数据,该函数可以通过以下参数来控制抽样行为:
  • withReplacement:一个布尔值,指定抽样是否允许有放回。如果设置为 true,则允许同一元素在多次抽样中被选中;如果设置为 false,则不允许同一元素在多次抽样中被选中。默认值为 false
  • fraction:一个浮点数,表示要抽样的比例。它指定了从数据集中抽样的记录比例,范围在 0.0 1.0 之间。例如,如果设置为 0.5,则表示抽样 50% 的数据。默认值为 1.0,即抽样整个数据集。
  • seed:一个可选的长整型数值,用于指定随机种子。设置相同的随机种子将产生相同的抽样结果。默认情况下,随机种子是随机生成的。
def sample(
 withReplacement: Boolean,
 fraction: Double,
 seed: Long = Utils.random.nextLong): RDD[T]

(2)下面样例从 RDD 中随机抽样一部分数据:
注意:当将 fraction 参数设置为 0.5 时,并不能保证最终选择的数据数量恰好是原始数据量的 50%,具体有如下几个原因。
  • 分区边界:数据在不同的分区中,抽样操作是在每个分区内独立进行的。抽样的结果取决于每个分区内的数据情况,而不是整个数据集的总体比例。因此,如果数据在不同的分区中不均匀分布,最终选择的数据数量可能不是原始数据量的 50%
  • 随机性:抽样操作涉及随机性,使用随机数生成器进行数据选择。尽管设置相同的随机种子可以产生相同的随机序列,但在分布式环境中,每个计算节点上的随机种子可能不同,从而导致不同节点上的抽样结果有所差异。
// 创建RDD
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))

// 随机抽样一部分数据
val rdd2 = rdd1.sample(withReplacement = false, fraction = 0.5)

// 打印结果
rdd2.collect().foreach(println)

9,distinct

(1)该函数用于对 RDD 中的元素进行去重操作,并返回一个新的 RDD,其中包含去重后的元素。函数可以指定分区数。这个参数用于控制去重操作后生成的 RDD 的分区数。
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

(2)下面样例将数据集中重复的数据去重:
// 创建RDD
val rdd1 = sc.makeRDD(List(1, 2, 3, 1, 6, 2, 4, 5, 3))

// 删除重复的元素
val rdd2 = rdd1.distinct()

// 打印结果
rdd2.collect().foreach(println)

10,coalesce

(1)该函数用于减少数据集的分区数。它将数据集的分区数缩减为指定的值,减少了数据的并行度,但也减少了分区之间的数据传输开销,减小任务调度成本。
提示coalesce() 函数存在一个可选的参数 shuffle,用于控制是否进行 Shuffle 操作。该参数默认值为 false,表示不进行 Shuffle 操作,而是仅简单地将数据合并到更少的分区中。如果将 shuffle 参数设置为 true,则会触发 Shuffle 操作,重新洗牌和重新分区数据。
def coalesce(numPartitions: Int, shuffle: Boolean = false,
 partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
 (implicit ord: Ordering[T] = null)
 : RDD[T]

(2)下面样例将原来的 4 个分区数减少为 2 个:
// 创建RDD(指定分区数为3个)
// 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10)
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3)

// 将分区减少到2个(不进行Shuffle操作)
val rdd2 = rdd1.coalesce(2)
    
// 打印每个分区的数据
rdd2.glom().collect().foreach { partition =>
  println(partition.mkString(", "))
}

11,repartition

(1)该函数可以用于重新分区数据集,增加或减少数据集的分区数。
提示:该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDDrepartition 操作都可以完成,因为无论如何都会经 shuffle 过程。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

(2)下面是一个增加 RDD 的分区数的示例:
// 创建RDD(指定分区数为3个)
// 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10)
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3)

// 将分区增大到6个
val rdd2 = rdd1.repartition(6)

// 打印每个分区的数据
rdd2.glom().collect().foreach { partition =>
  println(partition.mkString(", "))
}

12,sortBy

(1)该函数可以根据指定的列或表达式对数据进行排序,并返回一个按照指定排序顺序排列的新数据集。参数说明:
  • f: (T) => K:排序键函数,指定了根据哪个列或表达式进行排序。函数 f 接收数据集中的每个元素 T,并返回排序键 K,即用于排序的值。
  • ascending: Boolean(可选,默认值为 true):排序顺序参数,指定排序是升序还是降序。设置为 true 表示升序排序,设置为 false 表示降序排序。
  • numPartitions: Int,用于指定结果数据集的分区数。如果未指定 numPartitions,则结果数据集将保持与原始数据集相同的分区数。
注意:无论指定的结果分区数是否与原始数据集分区数一致,中间均会存在 shuffle 的过程。
def sortBy[K](
 f: (T) => K,
 ascending: Boolean = true,
 numPartitions: Int = this.partitions.length)
 (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

(2)下面样例将数据进行降序排序:
// 创建RDD(指定分区数为3个)
val rdd1 = sc.makeRDD(List(8,1,9,6,4,5,7,10,3,2), 3)

// 将数据进行降序排序(同时将分区数改成4)
val rdd2 = rdd1.sortBy(x => x, ascending = false, 4)

// 打印每个分区的数据
rdd2.glom().collect().foreach { partition =>
  println(partition.mkString(", "))
}
评论

全部评论(0)

回到顶部