Spark - 性能优化详解2(使用高性能序列化类库)
作者:hangge | 2024-07-26 08:30
二、使用高性能序列化类库
1,基本介绍
(1)Spark 默认会在一些地方对数据进行序列化,如果我们的算子函数使用到了外部的数据(比如 Java 中的自定义类型),那么也需要让其可序列化,否则程序在执行的时候是会报错的,提示没有实现序列化。
(2)因为虽然 Spark 的初始化工作是在 Driver 进程中进行的,但是实际执行是在 Worker 节点的 Executor 进程中进行的;当 Executor 端需要用到 Driver 端封装的对象时,就需要把 Driver 端的对象通过序列化传输到 Executor 端,这个对象就需要实现序列化。否则会报错,提示对象没有实现序列化。
遇到没有实现序列化的对象,解决方法有两种:
- 如果此对象可以支持序列化,则将其实现 Serializable 接口,让它支持序列化
- 如果此对象不支持序列化,针对一些数据库连接之类的对象,这种对象是不支持序列化的,所以可以把这个代码放到算子内部,这样就不会通过 driver 端传过去了,它会直接在 executor 中执行。
2,Spark 的序列化机制
(1)Spark 实际上提供了两种序列化机制:Java 序列化机制(默认使用)和 Kryo 序列化机制。
- Java 序列化机制:默认情况下,Spark 使用 Java 自身的 ObjectInputStream 和 ObjectOutputStream 机制进行对象的序列化。只要我们的类实现了 Serializable 接口,那么都是可以序列化的。但是 Java 序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大,这是它的缺点。
- Kryo 序列化机制:Spark 也支持使用 Kryo 序列化。Kryo 序列化机制比 Java 序列化机制更快,而且序列化后的数据占用的空间更小,通常比 Java 序列化的数据占用的空间要小 10 倍左右。
(2)Kryo 序列化机制之所以不作为默认序列化机制的原因如下:
- 因为有些类型虽然实现了 Seriralizable 接口,但是它也不一定能够被 Kryo 进行序列化;
- 如果我们要得到最佳的性能,Kryo 还要求我们在 Spark 应用程序中,对所有你需要序列化的类型都进行手工注册,这样就比较麻烦了
3,使用 Kryo 序列化的场景
一般是针对一些自定义的对象,例如我们自己定义了一个对象,这个对象里面包含了几十 M,或者上百 M 的数据,然后在算子函数内部,使用到了这个外部的大对象。
- 如果默认情况下,让 Spark 用 java 序列化机制来序列化这种外部的大对象,那么就会导致序列化速度比较慢,并且序列化以后的数据还是比较大。
- 所以,在这种情况下,比较适合使用 Kryo 序列化类库,来对外部的大对象进行序列化,提高序列化速度,减少序列化后的内存空间占用。
4,如何使用 Kryo 序列化机制
(1)首先要用 SparkConf 设置 spark.serializer 的值为 org.apache.spark.serializer.KryoSerializer,就是将 Spark 的序列化器设置为 KryoSerializer。这样,Spark 在进行序列化时,就会使用 Kryo 进行序列化了。
(2)使用 Kryo 时针对需要序列化的类,需要预先进行注册,这样才能获得最佳性能。如果不注册的话,Kryo 也能正常工作,只是 Kryo 必须时刻保存类型的全类名,反而占用不少内存。
- Spark 默认对 Scala 中常用的类型在 Kryo 中做了注册,但是,如果在自己的算子中,使用了外部的自定义类型的对象,那么还是需要对其进行注册。
- 注册自定义的数据类型格式如下:
conf.registerKryoClasses(...)
(3)如果要序列化的自定义的类型,字段特别多,此时就需要对 Kryo 本身进行优化,因为 Kryo 内部的缓存可能不够存放那么大的 class 对象。
- 具体来说,需要调用 SparkConf.set() 方法,设置 spark.kryoserializer.buffer.mb 参数的值,将其调大,默认值为 2,单位是 MB,也就是说最大能缓存 2M 的对象,然后进行序列化。可以在必要时将其调大。
5,Kryo 序列化使用样例
(1)下面样例我们指定使用 kryo 序列化机制对自定义的数据类型 Person 进行序列化:
import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} /** * 需求:Kryo序列化的使用 */ object KryoSerScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("KryoSerScala") .setMaster("local") //指定使用kryo序列化机制,注意:如果使用了registerKryoClasses,其实这一行设置是可以省略的 .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Person]))//注册自定义的数据类型 val sc = new SparkContext(conf) val dataRDD = sc.parallelize(Array("hello hangge","bye google")) val wordsRDD = dataRDD.flatMap(_.split(" ")) val personRDD = wordsRDD.map(word=>Person(word,18)).persist(StorageLevel.MEMORY_ONLY_SER) personRDD.foreach(println(_)) //while循环是为了保证程序不结束,方便在本地查看4040页面中的storage信息 while (true) { ; } } } case class Person(name: String,age: Int) extends Serializable
- 执行任务,然后访问 localhost 的 4040 界面在界面中可以看到 cache 的数据大小是 48 字节。
(2)那我们把 kryo 序列化设置去掉,使用默认的 java 序列化在运行下程序:
val conf = new SparkConf() conf.setAppName("KryoSerScala") .setMaster("local") //指定使用kryo序列化机制,注意:如果使用了registerKryoClasses,其实这一行设置是可以省略的 // .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") // .registerKryoClasses(Array(classOf[Person]))//注册自定义的数据类型 val sc = new SparkContext(conf)
- 再访问 localhost 的 4040 界面,发现此时占用的内存空间是 127 字节,比使用 kryo 的方式内存空间多占用了将近 3 倍。所以从这可以看出来,使用 kryo 序列化方式对内存的占用会降低很多。
val conf = new SparkConf() conf.setAppName("KryoSerScala") .setMaster("local") //指定使用kryo序列化机制,注意:如果使用了registerKryoClasses,其实这一行设置是可以省略的 .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") // .registerKryoClasses(Array(classOf[Person]))//注册自定义的数据类型 val sc = new SparkContext(conf)
- 运行任务,再访问 localhost 的 4040 界面。发现此时的内存占用为 76 字节,介于前面的 48 字节和 127 字节之间。
提示:由此可以看出,在使用 kryo 序列化的时候,针对自定义的类型最好是手工注册一下,否则就算开启了 kryo 序列化,性能的提升也是有限的。
全部评论(0)