返回 导航

Spark

hangge.com

Spark - RDD使用详解5(行动算子)

作者:hangge | 2023-10-05 09:00

七、行动算子

1,reduce

(1)该函数用于聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
def reduce(f: (T, T) => T): T

(2)下面是一个 reduce 函数使用样例:
object Hello {
  def main(args: Array[String]): Unit = {

    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 创建RDD
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

    // 对数据集中的数据进行规约操作
    val sum: Int = rdd.reduce(_ + _)

    // 打印结果
    println(sum)

    //关闭 Spark
    sc.stop()
  }
}

2,collect

(1)该函数用于将分布式数据集中的所有元素收集到驱动程序中,并以数组的形式返回。
def collect(): Array[T]

(2)下面是一个 collect 函数使用样例:
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

// 收集数据到 Driver
val array: Array[Int] = rdd.collect()

// 打印结果
array.foreach(println)

3,count

(1)该函数用于计算数据集中元素的数量,它返回 RDD 中元素的个数
def count(): Long

(2)下面是一个 count 函数使用样例:
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

// 统计 RDD 中元素的数量
val count: Long = rdd.count()

// 打印结果
println(count)

4,first

(1)该函数用于获取数据集中的第一个元素,它返回 RDD 中的第一个元素。
def first(): T

(2)下面是一个 first 函数使用样例:
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

// 获取RDD中第一额元素
val first: Int = rdd.first()

// 打印结果
println(first)

5,take

(1)该函数用于从数据集中获取指定数量的元素,它返回一个由 RDD 的前 n 个元素组成的数组:
def take(num: Int): Array[T]

(2)下面是一个 take 函数使用样例:
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

// 获取RDD前2个元素
val result: Array[Int] = rdd.take(2)

// 打印结果
println(result.mkString(","))

6,takeOrdered

(1)该函数返回该 RDD 排序后的前 n 个元素组成的数组。
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

(2)下面是一个 takeOrdered 函数使用样例:
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(5, 3, 1, 4, 2))

// 获取RDD排序后前2个元素
val result: Array[Int] = rdd.takeOrdered(2)

// 打印结果
println(result.mkString(","))

7,aggregate

(1)该函数用于对数据集中的元素进行聚合操作,并返回一个聚合结果。它接受两个参数:初始值(零值)和一个聚合函数。
  • 首先分区的数据通过初始值和分区内的数据进行聚合
  • 然后再和初始值进行分区间的数据聚合
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

(2)下面是一个 takeOrdered 函数使用样例:
// 创建RDD(指定分区数为3个)
// 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10)
val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3)

// 对RDD数据进行聚合
val sum: Int = rdd.aggregate(1000)(_ + _, _ + _)

// 打印结果
println(sum)

8,fold

(1)该函数用于对数据集中的元素进行聚合操作,并返回一个聚合结果。它是 aggregate 的简化版操作(分区内和分区间的聚合函数相同)
def fold(zeroValue: T)(op: (T, T) => T): T

(2)下面是一个 fold 函数使用样例:
// 创建RDD(指定分区数为3个)
// 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10)
val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3)

// 对RDD数据进行聚合
val sum: Int = rdd.fold(1000)(_ + _)

// 打印结果
println(sum)

9,countByKey

(1)该函数用于计算键值对 RDD 中每个键出现的次数,并返回一个键值对集合,其中键是唯一的,值是键出现的频率。
def countByKey(): Map[K, Long]

(2)下面是一个 countByKey 函数使用样例:
// 创建RDD
val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("c", 4), ("b", 5)))

// 计算 RDD 中每个键的出现次数
val result: collection.Map[String, Long] = rdd.countByKey()

// 打印结果
println(result)

10,foreach

(1)该函数可以分布式遍历 RDD 中的每一个元素,调用指定函数:
def foreach(f: T => Unit): Unit = withScope {
 val cleanF = sc.clean(f)
 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

(2)下面样例使用 foreach 方法进行分布式打印元素:
// 创建RDD(指定分区数为3个)
// 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10)
val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3)

// 分布式打印
rdd.foreach(println)

11,saveAsTextFile、saveAsObjectFile、saveAsSequenceFile

(1)这些函数用于将数据保存到不同格式的文件中:
  • saveAsTextFile:该函数用于将数据集保存为文本文件,每个元素作为文件中的一行。它接受一个文件路径作为参数,并将数据集的内容保存到指定的路径中。
  • saveAsObjectFile:该函数用于将数据集保存为序列化对象文件。它接受一个文件路径作为参数,并将数据集的内容保存为序列化对象。
  • saveAsSequenceFile:该函数用于将键值对类型的 RDD 保存为 Hadoop 序列文件(SequenceFile)格式。每个键值对都作为 SequenceFile 中的一个记录。它接受一个文件路径和可选的压缩编解码器作为参数。
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
 path: String,
 codec: Option[Class[_ <: CompressionCodec]] = None): Unit

(2)下面样例使用 saveAsTextFile 函数将 RDD 数据保存到文本文件中(文件数由分区数决定):
// 创建RDD(两个分区)
val rdd = sc.makeRDD(List("hello", "world", "hangge.com"), 2)

// 保存成Text文件(存放在output文件夹下)
rdd.saveAsTextFile("output")
评论

全部评论(0)

回到顶部