Hadoop - MapReduce中常见的数据压缩格式详解(附使用样例)
作者:hangge | 2024-08-21 09:03
一、基本介绍
1,MapReduce 中常见的数据压缩格式
(1)DEFLATE
- DEFLATE 是同时使用了 LZ77 算法与哈夫曼编码(Huffman Coding)的一个无损数据压缩算法,其底层使用的是 Zlib。
- DEFLATE 压缩与解压的源代码可以在自由、通用的压缩库 Zlib 上找到(Zlib官网)
- jdk 中对 Zlib 压缩库提供了支持,压缩类 Deflater 和解压类 Inflater,Deflater 和 Inflater 都提供了 Native 方法。
(2)Gzip
- Gzip 的实现算法还是 DEFLATE,只是在 DEFLATE 格式上增加了文件头和文件尾
- JDK 同样也对 Gzip 提供了支持,分别是 GZIPOutputStream 和 GZIPInputStream 类,同样可以发现 GZIPOutputStream 是继承于 DeflaterOutputStream 的,GZIPInputStream 继承于 InflaterInputStream,并且可以在源码中发现 writeHeader 和 writeTrailer 方法。
(3)Bzip2
- Bzip2 是 Julian Seward 开发并按照自由软件/开源软件协议发布的数据压缩算法及程序。
- Seward 在 1996 年 7 月第一次公开发布了 Bzip2 0.15 版,在随后几年中这个压缩工具稳定性得到改善并且日渐流行,Seward 在 2000 年晚些时候发布了 1.0 版。
- Bzip2 比传统的 Gzip 的压缩效率更高,但是它的压缩速度较慢。
(4)Lz4
- Lz4 是一种无损数据压缩算法,着重于压缩和解压缩速度。
(5)Lzo
- Lzo 是致力于解压速度的一种数据压缩算法,Lzo 是 Lempel-Ziv-Oberhumer 的缩写,这个算法是无损算法。
(6)Snappy
- Snappy(以前称 Zippy)是 Google 基于 LZ77 的思路用 C++ 语言编写的快速数据压缩与解压程序库,并在 2011 年开源。
- 它的目标并非最大压缩率或与其他压缩程序库的兼容性,而是非常高的速度和合理的压缩率。
注意:DEFLATE 压缩格式底层使用的是 zlib,Gzip 是对 DEFLATE 进行了封装,所以 Hadoop 只有 lzo 没有集成,其他的压缩格式都是可以正常使用的。
2,各压缩格式对比
(1)下面是压缩格式的文件扩展名、是否可切分、压缩比、压缩速度和解压速度这几项进行对比:
各个对比项说明:
- 文件扩展名:表示压缩后的数据文件的后缀名称。
- 是否可切分:表示压缩后的数据文件在被 MapReduce 读取的时候,是否会产生多个 InputSplit。如果这个压缩格式产生的文件不可切分,那也就意味着,无论这个压缩文件有多大,在 MapReduce 中都只会产生 1 个 Map 任务。如果压缩后的文件不大,也就 100M 左右,这样对性能没有多大影响。但是如果压缩后的文件比较大,达到了 1 个 G,由于不可切分,这样只能使用 1 个 Map 任务去计算,性能就比较差了,这个时候就没有办法达到并行计算的效果了。所以是否可切分这个特性是非常重要的,特别是当我们无法控制单个压缩文件大小的时候。
- 压缩比:表示压缩格式的压缩效果,压缩比越高,说明压缩效果越好,对应产生的压缩文件就越小。如果集群的存储空间有限,则需要重点关注压缩比,这个时候需要选择尽可能高的压缩比。
- 压缩速度:表示将原始文件压缩为指定压缩格式消耗的时间。压缩功能消耗的时间会体现在任务最终消耗的时间里面,所以这个指标也需要重点考虑。
- 解压速度:表示将指定压缩格式的数据文件解压为原始文件消耗的时间。因为 MapReduce 在使用压缩文件的时候需要先进行解压才能使用,解压消耗的时间也会体现在任务最终消耗的时间里面,所以这个指标也需要重点考虑。
(2)针对表格中的这几种压缩格式,他们的文件扩展名还是很容易区分的,基本上都是以压缩格式名作为后缀,这个主要是为了后期能够通过文件后缀名快速区分出来使用的哪种压缩格式。
(3)针对是否可切分:主要是 Bzip2 可以原生支持切分,还有一个 Lzo 通过给压缩数据建立索引也可以支持切分。
注意:这个表中所说的是否可切分是针对 TextFile 文件,也就是普通文本文件而言的。因为针对某一些特殊的文件格式,结合不可切分的压缩格式之后,依然是可以支持切分的,这是由于这些特殊文件格式自身的特性决定的,和压缩格式的特性没有关系。
(5)针对压缩速度和解压速度:Bzip2 的压缩速度和解压速度都是最低的,其实也是可以理解的,想要压缩效果好,对应的压缩和解压肯定会多消耗一些时间。
(2)运行后会生成一个 200MB 的文件,文件内容如下。每个单词都是唯一的,没有重复。
(2)将任务打包成 Jar 包上传至 Hadoop 集群的任意一台机器上,并且执行如下命令向集群提交 Jar 包:
(3)我们查看任务执行时的日志信息,主要查看 number of splits 后面的值,这个值表示针对任务的输入数据会产生几个 Split。这里显示的是 2,说明这个 200M 的文件会产生 2 个 Split,最终也只会产生 2 个 map 任务。
(2)任务执行完毕后,查看最终结果文件只有 32MB:
(2)任务执行完毕后,查看最终结果文件同样只有 32MB:
(2)任务执行完毕后,查看最终结果文件同样只有 18MB:
(2)任务执行完毕后,查看最终结果文件为 61MB:
(2)任务执行完毕后,查看最终结果文件只有 53MB:
(4)解决办法是访问 Maven 中央仓库(点击访问),将 LZ4 的 jar 包下载下来。
3,压缩位置
(1)在 MapReduce 的整个过程中,可以在两个地方设置数据压缩格式:
- 一个是针对 Map 阶段的输出数据进行压缩。
- 一个是针对 Reduce 阶段的输出数据进行压缩。
(2)针对 Map 阶段的输出数据:建议选择压缩和解压速度快的压缩格式。
- Map 阶段的数据落盘后会通过 Shuffle,也就是通过网络传输到 Reduce 端。
- 压缩 Map 的输出是可以提高网络传输效率的。但是压缩 Map 的输出会增加 CPU 的消耗。
- Map 阶段在处理数据的时候自己本来就会消耗过多的 CPU,所以此时应该重点考虑使用压缩和解压速度比较快的 LZO、Snappy。
(3)针对 Reduce 阶段的输出数据:需要分为两种场景。
- 如果结果数据是永久保存,此时需要重点考虑压缩效果比较好的 Bzip2 和 Gzip。
- 如果结果数据还需要让另一个 MapReduce 任务继续计算,则需要重点考虑压缩后的数据文件是否支持切分。比如:Bzip2、Lzo。
4,各种压缩格式压缩后的数据文件大小比较
下图里显示的是未压缩的数据文件和各种压缩格式压缩后的数据文件的大小。其中未压缩的数据文件大小为 2267M,图中按照压缩效果进行排列,最右侧的是压缩效果最好的,Bzip2 压缩后的文件只有 203M。
注意:这个压缩后的数据文件大小,和原始数据内容也是有关系的,所以这里的数值大小仅作为参考,但是都是符合这个规律的。
二、数据压缩样例
1,生成测试数据
(1)首先我们创建一个生成测试数据的工具类 GenerateData,代码如下:
public class GenerateData { public static void main(String[] args) throws Exception{ String fileName = "D:\\words.dat"; System.out.println("start: 开始生成200M文件->"+fileName); BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName)); int num = 0; while(num<7080000){ bfw.write("hello_"+num+" hangge_"+num); bfw.newLine(); num ++; if(num%10000==0){ bfw.flush(); } } System.out.println("end: 200M文件已生成"); } }
(2)运行后会生成一个 200MB 的文件,文件内容如下。每个单词都是唯一的,没有重复。
(3)最后我们将该文件上传到 HDFS 上:
2,执行未压缩的任务
(1)首先我们编写一个实现单词统计功能的 MapReduce 任务类 WordCountJob,代码如下:
提示:其与之前的 WordCount 任务类主要区别是会解析命令行中通过 -D 传递过来的参数,添加到 conf 中,便于我们启动任务时动态设置各种压缩命令。
public class WordCountJob { /** * 创建自定义mapper类 */ public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ /** * 需要实现map函数 * 这个map函数就是可以接收k1,v1,产生k2,v2 * @param k1 * @param v1 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // k1代表的是每一行的行首偏移量,v1代表的是每一行内容 // 对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); // 迭代切割出来的单词数据 for(String word:words){ // 把迭代出来的单词封装成<k2,v2>的形式 Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); // 输出k2,v2 context.write(k2,v2); } } } /** * 创建自定义的reducer类 */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> { /** * 针对v2s的数据进行累加求和 * @param k2 * @param v2s * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //v2s {1,1,1,1} //创建一个sum变量,保存v2s的和 long sum = 0L; for(LongWritable v2:v2s){ sum += v2.get(); } //组装k3,v3 Text k3 = k2; LongWritable v3 = new LongWritable(sum); //输出结果 context.write(k3,v3); } } /** * 组装job = map + reduce */ public static void main(String[] args){ try{ //指定Job需要的配置参数 Configuration conf = new Configuration(); //解析命令行中通过-D传递过来的参数,添加到conf中 String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //创建一个Job Job job = Job.getInstance(conf); // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类 job.setJarByClass(WordCountJob.class); // 指定输入路径 FileInputFormat.setInputPaths(job,new Path(remainingArgs[0])); // 指定输出路径 FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1])); // 指定map相关的代码 job.setMapperClass(MyMapper.class); // 指定k2的类型 job.setMapOutputKeyClass(Text.class); // 指定v2的类型 job.setMapOutputValueClass(LongWritable.class); // 指定reduce相关的代码 job.setReducerClass(MyReducer.class); // 指定k3的类型 job.setOutputKeyClass(Text.class); // 指定v3的类型 job.setOutputValueClass(LongWritable.class); // 提交job job.waitForCompletion(true); }catch (Exception e){ e.printStackTrace(); } } }
(2)将任务打包成 Jar 包上传至 Hadoop 集群的任意一台机器上,并且执行如下命令向集群提交 Jar 包:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob /words.dat /out/no_compress
(3)我们查看任务执行时的日志信息,主要查看 number of splits 后面的值,这个值表示针对任务的输入数据会产生几个 Split。这里显示的是 2,说明这个 200M 的文件会产生 2 个 Split,最终也只会产生 2 个 map 任务。
(4)任务执行完毕后查看生成的结果,可以发现由于没有启用压缩,生成的文件为 220MB。
3,执行 Deflate 压缩格式的任务
(1)我们使用如下命令提交任务,可以使用 Deflate 压缩格式对 Reduce 输出结果进行压缩:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DeflateCodec /words.dat /out/compress_deflate
(2)任务执行完毕后,查看最终结果文件只有 32MB:
4,执行 Gzip 压缩格式的任务
(1)我们使用如下命令提交任务,可以使用 Gzip 压缩格式对 Reduce 输出结果进行压缩:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec /words.dat /out/compress_gzip
(2)任务执行完毕后,查看最终结果文件同样只有 32MB:
5,执行 Bzip2 压缩格式的任务
(1)我们使用如下命令提交任务,可以使用 Bzip2 压缩格式对 Reduce 输出结果进行压缩:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZip2Codec /words.dat /out/compress_bzip2
(2)任务执行完毕后,查看最终结果文件同样只有 18MB:
6,执行 Snappy 压缩格式的任务
(1)我们使用如下命令提交任务,可以使用 Snappy 压缩格式对 Reduce 输出结果进行压缩:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec /words.dat /out/compress_snappy
(2)任务执行完毕后,查看最终结果文件为 61MB:
7,执行 Lz4 压缩格式的任务
(1)我们使用如下命令提交任务,可以使用 Lz4 压缩格式对 Reduce 输出结果进行压缩:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.Lz4Codec /words.dat /out/compress_lz4
(2)任务执行完毕后,查看最终结果文件只有 53MB:
(3)由于我环境缺少 LZ4 相关库,上面命令执行时会报如下错误:
Error: java.lang.ClassNotFoundException: net.jpountz.lz4.LZ4Factory
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.hadoop.io.compress.lz4.Lz4Compressor.<init>(Lz4Compressor.java:66)
at org.apache.hadoop.io.compress.Lz4Codec.createCompressor(Lz4Codec.java:119)
(4)解决办法是访问 Maven 中央仓库(点击访问),将 LZ4 的 jar 包下载下来。
(5)然后将这个 jar 包上传到 Hadoop 集群的 hadoop 目录下的 share/hadoop/common 目录中:
注意:需要上传到集群中的所有节点里面。
全部评论(0)