返回 导航

Spark

hangge.com

Spark - 性能优化详解2(使用高性能序列化类库)

作者:hangge | 2024-07-26 08:30

二、使用高性能序列化类库

1,基本介绍

(1)Spark 默认会在一些地方对数据进行序列化,如果我们的算子函数使用到了外部的数据(比如 Java 中的自定义类型),那么也需要让其可序列化,否则程序在执行的时候是会报错的,提示没有实现序列化。

(2)因为虽然 Spark 的初始化工作是在 Driver 进程中进行的,但是实际执行是在 Worker 节点的 Executor 进程中进行的;当 Executor 端需要用到 Driver 端封装的对象时,就需要把 Driver 端的对象通过序列化传输到 Executor 端,这个对象就需要实现序列化。否则会报错,提示对象没有实现序列化。
遇到没有实现序列化的对象,解决方法有两种:
  • 如果此对象可以支持序列化,则将其实现 Serializable 接口,让它支持序列化
  • 如果此对象不支持序列化,针对一些数据库连接之类的对象,这种对象是不支持序列化的,所以可以把这个代码放到算子内部,这样就不会通过 driver 端传过去了,它会直接在 executor 中执行。

2,Spark 的序列化机制

(1)Spark 实际上提供了两种序列化机制:Java 序列化机制(默认使用)和 Kryo 序列化机制。
  • Java 序列化机制:默认情况下,Spark 使用 Java 自身的 ObjectInputStream ObjectOutputStream 机制进行对象的序列化。只要我们的类实现了 Serializable 接口,那么都是可以序列化的。但是 Java 序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大,这是它的缺点。
  • Kryo 序列化机制Spark 也支持使用 Kryo 序列化。Kryo 序列化机制比 Java 序列化机制更快,而且序列化后的数据占用的空间更小,通常比 Java 序列化的数据占用的空间要小 10 倍左右。

(2)Kryo 序列化机制之所以不作为默认序列化机制的原因如下:
  • 因为有些类型虽然实现了 Seriralizable 接口,但是它也不一定能够被 Kryo 进行序列化;
  • 如果我们要得到最佳的性能,Kryo 还要求我们在 Spark 应用程序中,对所有你需要序列化的类型都进行手工注册,这样就比较麻烦了

3,使用 Kryo 序列化的场景

    一般是针对一些自定义的对象,例如我们自己定义了一个对象,这个对象里面包含了几十 M,或者上百 M 的数据,然后在算子函数内部,使用到了这个外部的大对象。
  • 如果默认情况下,让 Spark java 序列化机制来序列化这种外部的大对象,那么就会导致序列化速度比较慢,并且序列化以后的数据还是比较大。
  • 所以,在这种情况下,比较适合使用 Kryo 序列化类库,来对外部的大对象进行序列化,提高序列化速度,减少序列化后的内存空间占用。

4,如何使用 Kryo 序列化机制

(1)首先要用 SparkConf 设置 spark.serializer 的值为 org.apache.spark.serializer.KryoSerializer,就是将 Spark 的序列化器设置为 KryoSerializer。这样,Spark 在进行序列化时,就会使用 Kryo 进行序列化了。

(2)使用 Kryo 时针对需要序列化的类,需要预先进行注册,这样才能获得最佳性能。如果不注册的话,Kryo 也能正常工作,只是 Kryo 必须时刻保存类型的全类名,反而占用不少内存。
  • Spark 默认对 Scala 中常用的类型在 Kryo 中做了注册,但是,如果在自己的算子中,使用了外部的自定义类型的对象,那么还是需要对其进行注册。
  • 注册自定义的数据类型格式如下:
conf.registerKryoClasses(...)

(3)如果要序列化的自定义的类型,字段特别多,此时就需要对 Kryo 本身进行优化,因为 Kryo 内部的缓存可能不够存放那么大的 class 对象。
  • 具体来说,需要调用 SparkConf.set() 方法,设置 spark.kryoserializer.buffer.mb 参数的值,将其调大,默认值为 2,单位是 MB,也就是说最大能缓存 2M 的对象,然后进行序列化。可以在必要时将其调大。

5,Kryo 序列化使用样例

(1)下面样例我们指定使用 kryo 序列化机制对自定义的数据类型 Person 进行序列化:
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:Kryo序列化的使用
 */
object KryoSerScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("KryoSerScala")
      .setMaster("local")
      //指定使用kryo序列化机制,注意:如果使用了registerKryoClasses,其实这一行设置是可以省略的
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[Person]))//注册自定义的数据类型
    val sc = new SparkContext(conf)

    val dataRDD = sc.parallelize(Array("hello hangge","bye google"))
    val wordsRDD = dataRDD.flatMap(_.split(" "))
    val personRDD = wordsRDD.map(word=>Person(word,18)).persist(StorageLevel.MEMORY_ONLY_SER)
    personRDD.foreach(println(_))

    //while循环是为了保证程序不结束,方便在本地查看4040页面中的storage信息
    while (true) {
      ;
    }
  }
}

case class Person(name: String,age: Int) extends Serializable
  • 执行任务,然后访问 localhost 4040 界面在界面中可以看到 cache 的数据大小是 48 字节。

(2)那我们把 kryo 序列化设置去掉,使用默认的 java 序列化在运行下程序:
val conf = new SparkConf()
conf.setAppName("KryoSerScala")
  .setMaster("local")
  //指定使用kryo序列化机制,注意:如果使用了registerKryoClasses,其实这一行设置是可以省略的
  // .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  // .registerKryoClasses(Array(classOf[Person]))//注册自定义的数据类型
val sc = new SparkContext(conf)
  • 再访问 localhost 4040 界面,发现此时占用的内存空间是 127 字节,比使用 kryo 的方式内存空间多占用了将近 3 倍。所以从这可以看出来,使用 kryo 序列化方式对内存的占用会降低很多。

(3)如果我们只是将 spark 的序列化机制改为了 kryo 序列化,但是没有对使用到的自定义类型手工进行注册,那么此时内存的占用会介于前面两种情况之间。这里我们修改代码,只注释掉 registerKryoClasses 这一行代码:
val conf = new SparkConf()
conf.setAppName("KryoSerScala")
  .setMaster("local")
  //指定使用kryo序列化机制,注意:如果使用了registerKryoClasses,其实这一行设置是可以省略的
   .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  // .registerKryoClasses(Array(classOf[Person]))//注册自定义的数据类型
val sc = new SparkContext(conf)
提示:由此可以看出,在使用 kryo 序列化的时候,针对自定义的类型最好是手工注册一下,否则就算开启了 kryo 序列化,性能的提升也是有限的。
评论

全部评论(0)

回到顶部