Spark - RDD使用详解6(序列化、Kryo)
作者:hangge | 2023-10-13 09:00
八、序列化
1,转换操作为什么需要序列化?
(1)Spark 是分布式执行引擎,其核心抽象是弹性分布式数据集 RDD,其代表了分布在不同节点的数据。Spark 的计算是在 executor 上分布式执行的,故用户开发的关于 RDD 的 map,flatMap,reduceByKey 等 transformation 操作(闭包)有如下执行过程:
- 代码中对象在 driver 本地序列化
- 对象序列化后传输到远程 executor 节点
- 远程 executor 节点反序列化对象
- 最终远程节点执行
(2)在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
(3)在 spark 中 4 个地方用到了序列化:
- 算子中用到了 driver 定义的外部变量的时候
- 将自定义的类型作为 RDD 的泛型类型,所有的自定义类型对象都会进行序列化
- 使用可序列化的持久化策略的时候。比如:MEMORY_ONLY_SER,spark 会将 RDD 中每个分区都序列化成一个大的字节数组。
- shuffle 的时候
2,不进行序列化会出现的问题演示
(1)下面代码算子内用到了外部对象的方法和属性,但该对象没有序列化:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 创建RDD val rdd = sc.makeRDD(List("hello", "world", "hangge.com")) // 创建Search对象,传入查询关键词"h" val search = new Search("h") // 调用Search对象的getMatch1方法并打印结果 search.getMatch1(rdd).collect().foreach(println) println("----------------------------------") // 调用Search对象的getMatch2方法并打印结果 search.getMatch2(rdd).collect().foreach(println) //关闭 Spark sc.stop() } // Search类,用于查询匹配的字符串 class Search(query: String){ // 判断字符串s是否包含查询关键 def isMatch(s: String): Boolean = { s.contains(query) } // 函数序列化案例 // 对RDD进行过滤,筛选出包含查询关键词的字符串 def getMatch1(rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } // 属性序列化案例 // 对RDD进行过滤,筛选出包含查询关键词的字符串 def getMatch2(rdd: RDD[String]): RDD[String] = { rdd.filter(x => x.contains(query)) } } }
(2)运行时报“java.io.NotSerializableException”错误,这是因为 Spark 的计算在 executor 端执行,而 isMatch 是 Search 对象中的方法,query 是 Search 对象中的属性。Search 对象存在于 Driver 端,需要将 Search 传到 executor 端才可执行 isMatch。driver 向 executor 端传数据涉及到网络传输,网络只能传字符串,不能传对象和数字,所以需要将对象进行序列化才可进行传递。
3,解决办法
(1)一种方法是让 Search 类继承 Serializable 即可:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 创建RDD val rdd = sc.makeRDD(List("hello", "world", "hangge.com")) // 创建Search对象,传入查询关键词"h" val search = new Search("h") // 调用Search对象的getMatch1方法并打印结果 search.getMatch1(rdd).collect().foreach(println) println("----------------------------------") // 调用Search对象的getMatch2方法并打印结果 search.getMatch2(rdd).collect().foreach(println) //关闭 Spark sc.stop() } // Search类,用于查询匹配的字符串 class Search(query: String) extends Serializable{ // 判断字符串s是否包含查询关键 def isMatch(s: String): Boolean = { s.contains(query) } // 函数序列化案例 // 对RDD进行过滤,筛选出包含查询关键词的字符串 def getMatch1(rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } // 属性序列化案例 // 对RDD进行过滤,筛选出包含查询关键词的字符串 def getMatch2(rdd: RDD[String]): RDD[String] = { rdd.filter(x => x.contains(query)) } } }
(2)另一种方法就是使用 case 关键字修饰类:
提示:如果我们将下面 scala 代码反编译为 java 就可以很清晰看到 Search 类底层默认实现了 Serializable 接口,所以使用 case 修饰类即可让类序列化。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 创建RDD val rdd = sc.makeRDD(List("hello", "world", "hangge.com")) // 创建Search对象,传入查询关键词"h" val search = new Search("h") // 调用Search对象的getMatch1方法并打印结果 search.getMatch1(rdd).collect().foreach(println) println("----------------------------------") // 调用Search对象的getMatch2方法并打印结果 search.getMatch2(rdd).collect().foreach(println) //关闭 Spark sc.stop() } // Search类,用于查询匹配的字符串 case class Search(query: String){ // 判断字符串s是否包含查询关键 def isMatch(s: String): Boolean = { s.contains(query) } // 函数序列化案例 // 对RDD进行过滤,筛选出包含查询关键词的字符串 def getMatch1(rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } // 属性序列化案例 // 对RDD进行过滤,筛选出包含查询关键词的字符串 def getMatch2(rdd: RDD[String]): RDD[String] = { rdd.filter(x => x.contains(query)) } } }
(3)如果当 Executor 内需要的是一个属性时,我们也可以使用局部变量接收这个属性值,传递局部变量而不是属性,这样也是可以的:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 创建RDD val rdd = sc.makeRDD(List("hello", "world", "hangge.com")) // 创建Search对象,传入查询关键词"h" val search = new Search("h") // 调用Search对象的getMatch1方法并打印结果 //search.getMatch1(rdd).collect().foreach(println) println("----------------------------------") // 调用Search对象的getMatch2方法并打印结果 search.getMatch2(rdd).collect().foreach(println) //关闭 Spark sc.stop() } // Search类,用于查询匹配的字符串 class Search(query: String){ // 判断字符串s是否包含查询关键 def isMatch(s: String): Boolean = { s.contains(query) } // 函数序列化案例 // 对RDD进行过滤,筛选出包含查询关键词的字符串 //def getMatch1(rdd: RDD[String]): RDD[String] = { // rdd.filter(isMatch) //} // 属性序列化案例 // 对RDD进行过滤,筛选出包含查询关键词的字符串 def getMatch2(rdd: RDD[String]): RDD[String] = { val localQuery = query rdd.filter(x => x.contains(localQuery)) } } }
附:Kryo 序列化框架
(1)Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark 2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。
提示:在特定的数据格式的情况下,KryoSerializer 的性能可以达到 JavaSerializer 的 10 倍以上,当然放到整个 Spark 程序中来考量,比重就没有那么大了,但是以 Wordcount 为例,通常也很容易达到 30% 以上的性能提升。而对于一些 Int 之类的基本类型数据,性能的提升就几乎可以忽略了。KryoSerializer 依赖 Twitter 的 Chill 库来实现,相对于 JavaSerializer,主要的问题在于不是所有的 Java Serializable 对象都能支持。
类别 | 优点 | 缺点 | 备注 |
java native serialization |
兼容性好、和 scala 更好融合
|
序列化性能较低、占用内存空间大(一般是Kryo Serialization 的 10 倍) | 默认的 serializer |
kryo serialization |
序列化速度快、占用空间小(即更紧凑)
|
不支持所有的 Serializable 类型、且需要用户注册要进行序列化的类 class | shuffle 的数据量较大或者较为频繁时建议使用 |
(2)如果要在 Spark 中使用 Kryo 序列化主要经过如下两个步骤:
- 声明使用 kryo 序列化
- 注册序列化类
注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]") .setAppName("Hello") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 替换默认的序列化机制 .registerKryoClasses(Array(classOf[Search])) // 注册需要使用 kryo 序列化的自定义类 // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 创建RDD val rdd = sc.makeRDD(List("hello", "world", "hangge.com")) // 创建Search对象,传入查询关键词"h" val search = new Search("h") // 调用Search对象的getMatch1方法并打印结果 search.getMatch1(rdd).collect().foreach(println) println("----------------------------------") // 调用Search对象的getMatch2方法并打印结果 search.getMatch2(rdd).collect().foreach(println) //关闭 Spark sc.stop() } // Search类,用于查询匹配的字符串 case class Search(query: String){ // 判断字符串s是否包含查询关键 def isMatch(s: String): Boolean = { s.contains(query) } // 函数序列化案例 // 对RDD进行过滤,筛选出包含查询关键词的字符串 def getMatch1(rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } // 属性序列化案例 // 对RDD进行过滤,筛选出包含查询关键词的字符串 def getMatch2(rdd: RDD[String]): RDD[String] = { rdd.filter(x => x.contains(query)) } } }
全部评论(0)