返回 导航

Spark

hangge.com

Spark - RDD使用详解6(序列化、Kryo)

作者:hangge | 2023-10-13 09:00

八、序列化

1,转换操作为什么需要序列化?

(1)Spark 是分布式执行引擎,其核心抽象是弹性分布式数据集 RDD,其代表了分布在不同节点的数据。Spark 的计算是在 executor 上分布式执行的,故用户开发的关于 RDDmapflatMapreduceByKeytransformation 操作(闭包)有如下执行过程:
  • 代码中对象在 driver 本地序列化
  • 对象序列化后传输到远程 executor 节点
  • 远程 executor 节点反序列化对象
  • 最终远程节点执行

(2)在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。

(3)在 spark4 个地方用到了序列化:
  • 算子中用到了 driver 定义的外部变量的时候
  • 将自定义的类型作为 RDD 的泛型类型,所有的自定义类型对象都会进行序列化
  • 使用可序列化的持久化策略的时候。比如:MEMORY_ONLY_SERspark 会将 RDD 中每个分区都序列化成一个大的字节数组。
  • shuffle 的时候

2,不进行序列化会出现的问题演示

(1)下面代码算子内用到了外部对象的方法和属性,但该对象没有序列化:
object Hello {
  def main(args: Array[String]): Unit = {

    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 创建RDD
    val rdd = sc.makeRDD(List("hello", "world", "hangge.com"))

    // 创建Search对象,传入查询关键词"h"
    val search = new Search("h")

    // 调用Search对象的getMatch1方法并打印结果
    search.getMatch1(rdd).collect().foreach(println)

    println("----------------------------------")

    // 调用Search对象的getMatch2方法并打印结果
    search.getMatch2(rdd).collect().foreach(println)

    //关闭 Spark
    sc.stop()
  }

  // Search类,用于查询匹配的字符串
  class Search(query: String){
    // 判断字符串s是否包含查询关键
    def isMatch(s: String): Boolean = {
      s.contains(query)
    }

    // 函数序列化案例
    // 对RDD进行过滤,筛选出包含查询关键词的字符串
    def getMatch1(rdd: RDD[String]): RDD[String] = {
      rdd.filter(isMatch)
    }

    // 属性序列化案例
    // 对RDD进行过滤,筛选出包含查询关键词的字符串
    def getMatch2(rdd: RDD[String]): RDD[String] = {
      rdd.filter(x => x.contains(query))
    }
  }
}

(2)运行时报“java.io.NotSerializableException”错误,这是因为 Spark 的计算在 executor 端执行,而 isMatchSearch 对象中的方法,querySearch 对象中的属性。Search 对象存在于 Driver 端,需要将 Search 传到 executor 端才可执行 isMatchdriverexecutor 端传数据涉及到网络传输,网络只能传字符串,不能传对象和数字,所以需要将对象进行序列化才可进行传递。

3,解决办法

(1)一种方法是让 Search 类继承 Serializable 即可:
object Hello {
  def main(args: Array[String]): Unit = {

    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 创建RDD
    val rdd = sc.makeRDD(List("hello", "world", "hangge.com"))

    // 创建Search对象,传入查询关键词"h"
    val search = new Search("h")

    // 调用Search对象的getMatch1方法并打印结果
    search.getMatch1(rdd).collect().foreach(println)

    println("----------------------------------")

    // 调用Search对象的getMatch2方法并打印结果
    search.getMatch2(rdd).collect().foreach(println)

    //关闭 Spark
    sc.stop()
  }

  // Search类,用于查询匹配的字符串
  class Search(query: String) extends Serializable{
    // 判断字符串s是否包含查询关键
    def isMatch(s: String): Boolean = {
      s.contains(query)
    }

    // 函数序列化案例
    // 对RDD进行过滤,筛选出包含查询关键词的字符串
    def getMatch1(rdd: RDD[String]): RDD[String] = {
      rdd.filter(isMatch)
    }

    // 属性序列化案例
    // 对RDD进行过滤,筛选出包含查询关键词的字符串
    def getMatch2(rdd: RDD[String]): RDD[String] = {
      rdd.filter(x => x.contains(query))
    }
  }
}

(2)另一种方法就是使用 case 关键字修饰类:
提示:如果我们将下面 scala 代码反编译为 java 就可以很清晰看到 Search 类底层默认实现了 Serializable 接口,所以使用 case 修饰类即可让类序列化。
object Hello {
  def main(args: Array[String]): Unit = {

    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 创建RDD
    val rdd = sc.makeRDD(List("hello", "world", "hangge.com"))

    // 创建Search对象,传入查询关键词"h"
    val search = new Search("h")

    // 调用Search对象的getMatch1方法并打印结果
    search.getMatch1(rdd).collect().foreach(println)

    println("----------------------------------")

    // 调用Search对象的getMatch2方法并打印结果
    search.getMatch2(rdd).collect().foreach(println)

    //关闭 Spark
    sc.stop()
  }

  // Search类,用于查询匹配的字符串
  case class Search(query: String){
    // 判断字符串s是否包含查询关键
    def isMatch(s: String): Boolean = {
      s.contains(query)
    }

    // 函数序列化案例
    // 对RDD进行过滤,筛选出包含查询关键词的字符串
    def getMatch1(rdd: RDD[String]): RDD[String] = {
      rdd.filter(isMatch)
    }

    // 属性序列化案例
    // 对RDD进行过滤,筛选出包含查询关键词的字符串
    def getMatch2(rdd: RDD[String]): RDD[String] = {
      rdd.filter(x => x.contains(query))
    }
  }
}

(3)如果当 Executor 内需要的是一个属性时,我们也可以使用局部变量接收这个属性值,传递局部变量而不是属性,这样也是可以的:
object Hello {
  def main(args: Array[String]): Unit = {

    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 创建RDD
    val rdd = sc.makeRDD(List("hello", "world", "hangge.com"))

    // 创建Search对象,传入查询关键词"h"
    val search = new Search("h")

    // 调用Search对象的getMatch1方法并打印结果
    //search.getMatch1(rdd).collect().foreach(println)

    println("----------------------------------")

    // 调用Search对象的getMatch2方法并打印结果
    search.getMatch2(rdd).collect().foreach(println)

    //关闭 Spark
    sc.stop()
  }

  // Search类,用于查询匹配的字符串
  class Search(query: String){
    // 判断字符串s是否包含查询关键
    def isMatch(s: String): Boolean = {
      s.contains(query)
    }

    // 函数序列化案例
    // 对RDD进行过滤,筛选出包含查询关键词的字符串
    //def getMatch1(rdd: RDD[String]): RDD[String] = {
    //  rdd.filter(isMatch)
    //}

    // 属性序列化案例
    // 对RDD进行过滤,筛选出包含查询关键词的字符串
    def getMatch2(rdd: RDD[String]): RDD[String] = {
      val localQuery = query
      rdd.filter(x => x.contains(localQuery))
    }
  }
}

附:Kryo 序列化框架

(1)Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark 2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable10 倍。当 RDDShuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

提示:在特定的数据格式的情况下,KryoSerializer 的性能可以达到 JavaSerializer10 倍以上,当然放到整个 Spark 程序中来考量,比重就没有那么大了,但是以 Wordcount 为例,通常也很容易达到 30% 以上的性能提升。而对于一些 Int 之类的基本类型数据,性能的提升就几乎可以忽略了。KryoSerializer 依赖 TwitterChill 库来实现,相对于 JavaSerializer,主要的问题在于不是所有的 Java Serializable 对象都能支持。
类别 优点 缺点 备注
java native serialization
兼容性好、和 scala 更好融合
序列化性能较低、占用内存空间大(一般是Kryo Serialization 10 倍) 默认的 serializer
kryo serialization
序列化速度快、占用空间小(即更紧凑)
不支持所有的 Serializable 类型、且需要用户注册要进行序列化的类 class shuffle 的数据量较大或者较为频繁时建议使用

(2)如果要在 Spark 中使用 Kryo 序列化主要经过如下两个步骤: 
  • 声明使用 kryo 序列化 
  • 注册序列化类
注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。
object Hello {
  def main(args: Array[String]): Unit = {

    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("Hello")
      .set("spark.serializer",
        "org.apache.spark.serializer.KryoSerializer")  // 替换默认的序列化机制
      .registerKryoClasses(Array(classOf[Search])) // 注册需要使用 kryo 序列化的自定义类

    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 创建RDD
    val rdd = sc.makeRDD(List("hello", "world", "hangge.com"))

    // 创建Search对象,传入查询关键词"h"
    val search = new Search("h")

    // 调用Search对象的getMatch1方法并打印结果
    search.getMatch1(rdd).collect().foreach(println)

    println("----------------------------------")

    // 调用Search对象的getMatch2方法并打印结果
    search.getMatch2(rdd).collect().foreach(println)

    //关闭 Spark
    sc.stop()
  }

  // Search类,用于查询匹配的字符串
  case class Search(query: String){
    // 判断字符串s是否包含查询关键
    def isMatch(s: String): Boolean = {
      s.contains(query)
    }

    // 函数序列化案例
    // 对RDD进行过滤,筛选出包含查询关键词的字符串
    def getMatch1(rdd: RDD[String]): RDD[String] = {
      rdd.filter(isMatch)
    }

    // 属性序列化案例
    // 对RDD进行过滤,筛选出包含查询关键词的字符串
    def getMatch2(rdd: RDD[String]): RDD[String] = {
      rdd.filter(x => x.contains(query))
    }
  }
}
评论

全部评论(0)

回到顶部