Hadoop - 解决数据倾斜问题教程(增加reduce任务数+打散倾斜数据)
作者:hangge | 2024-08-19 08:38
我们知道 MapReduce 是分为 Map 阶段和 Reduce 阶段,其实提高执行效率就是提高这两个阶段的执行效率。默认情况下 Map 阶段中 Map 任务的个数是和数据的 InputSplit 相关的,而 InputSplit 的个数一般是和 Block 块是有关联的,所以可以认为 Map 任务的个数和数据的 block 块个数有关系。针对 Map 任务的个数我们一般是不需要干预的,除非是前面我们说的海量小文件,那个时候可以考虑把小文件合并成大文件。其他情况是不需要调整的。
那就剩下 Reduce 阶段了,默认情况下 reduce 的个数是 1 个,所以现在 MapReduce 任务的压力就集中在 Reduce 阶段。在数据量比较大的时候,一个 reduce 任务处理起来肯定是比较慢的,所以我们可以考虑增加 reduce 任务的个数,这样就可以实现数据分流了,提高计算效率了。
那就剩下 Reduce 阶段了,默认情况下 reduce 的个数是 1 个,所以现在 MapReduce 任务的压力就集中在 Reduce 阶段。在数据量比较大的时候,一个 reduce 任务处理起来肯定是比较慢的,所以我们可以考虑增加 reduce 任务的个数,这样就可以实现数据分流了,提高计算效率了。
一、增加 reduce 任务数
1,准备工作
(1)假设我们有一个文件,有 1000W 条数据,里面的内容如下:
8 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 8 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 6 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 4 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
(2)每一行开头都是数字,1,2,3,4,5,6,7,8,9,10,我们希望统计出来每个数字出现的次数:
- 假设这 1000W 条数据的文件有 3 个 block,会产生 3 个 InputSplt,最终会产生 3 个 Map 任务,默认情况下只有一个 reduce 任务,所以所有的数据都会让这一个 reduce 任务处理,这样这个 Reduce 压力肯定很大,大量的时间都消耗在了这里。
(3)我们可以增加 reduce 任务的数量,把 reduce 任务的数量调整到 10 个,这个时候就会把 1000w 条数据让这 10 个 reduce 任务并行处理了,这个时候效率肯定会有一定的提升。
2,实现样例
(1)如果想要多个分区,很简单,只需要把 numReduceTasks 的数目调大即可。
- numReduceTasks 其实就是 reduce 任务的数量,那也就意味着,只要 redcue 任务数量变大了,对应的分区数也就变多了,有多少个分区就会有多少个 reduce 任务,那我们就不需要单独增加分区的数量了,只需要控制好 Redcue 任务的数量即可。
(2)下面是完整的代码:
/** * 增加Reduce任务个数 **/ public class WordCountJob { /** * Map阶段 */ public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{ /** * 需要实现map函数 * 这个map函数就是可以接收<k1,v1>,产生<k2,v2> */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //输出k1,v1的值 System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容 //对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); //把单词封装成<k2,v2>的形式 Text k2 = new Text(words[0]); LongWritable v2 = new LongWritable(1L); //把<k2,v2>写出去 context.write(k2,v2); } } /** * Reduce阶段 */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ /** * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去 */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //创建一个sum变量,保存v2s的和 long sum = 0L; //对v2s中的数据进行累加求和 for(LongWritable v2: v2s){ //输出k2,v2的值 System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); sum += v2.get(); //模拟Reduce的复杂计算消耗的时间 if(sum % 200 ==0){ Thread.sleep(1); } } //组装k3,v3 Text k3 = k2; LongWritable v3 = new LongWritable(sum); //输出k3,v3的值 System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); // 把结果写出去 context.write(k3,v3); } } /** * 组装Job=Map+Reduce */ public static void main(String[] args) { try{ if(args.length!=3){ //如果传递的参数不够,程序直接退出 System.exit(100); } //指定Job需要的配置参数 Configuration conf = new Configuration(); //创建一个Job Job job = Job.getInstance(conf); //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的 job.setJarByClass(WordCountJob.class); //指定输入路径(可以是文件,也可以是目录) FileInputFormat.setInputPaths(job,new Path(args[0])); //指定输出路径(只能指定一个不存在的目录) FileOutputFormat.setOutputPath(job,new Path(args[1])); //指定map相关的代码 job.setMapperClass(MyMapper.class); //指定k2的类型 job.setMapOutputKeyClass(Text.class); //指定v2的类型 job.setMapOutputValueClass(LongWritable.class); //指定reduce相关的代码 job.setReducerClass(MyReducer.class); //指定k3的类型 job.setOutputKeyClass(Text.class); //指定v3的类型 job.setOutputValueClass(LongWritable.class); //设置reduce任务个数 job.setNumReduceTasks(Integer.parseInt(args[2])); //提交job job.waitForCompletion(true); }catch(Exception e){ e.printStackTrace(); } } }
3,运行测试
(1)首先我们对项目代码进行编译、打包,提交到集群去执行,这里我指定 reduce 任务为 10 个:
(2)然后我们再到 yarn 的 web 界面查看任务的执行情况,可以看到 map 任务数量是 3,而 reduce 任务的数量确实增加到 10 个:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob /input /output 10
(2)然后我们再到 yarn 的 web 界面查看任务的执行情况,可以看到 map 任务数量是 3,而 reduce 任务的数量确实增加到 10 个:
(3)由于 reduce 任务的数量增加到 10 个,因此最终生成的结果文件也是 10 个(每个文件对应一个分区数据的统计结果)。
(4)我们可以通过如下命令查看所有的结果数据:
hdfs dfs -cat /output/*
二、数据倾斜问题解决
1,问题描述
(1)从上面的统计结果可以看到,值为5的数据有 840w 条左右,剩下的 9 个数字一共只有 160w 条,那也就意味着,这份数据中,值为 5 的数据比较集中,或者说值为 5 的数据属于倾斜的数据,在这一整份数据中,它占得比重比其他的数据多得多。
(2)虽然我们将 reduce 任务的数量增加到 10 个,但由于整个文件中值为 5 的数据占了大部分,这些数据只会被一个 reduce 任务处理,在这里假设是让 reduce5 处理了,reduce5 这个任务执行的是比较慢的,其他 reduce 任务都执行结束很长时间了,它还没执行结束,因为 reduce5 中处理的数据量和其他 reduce 中处理的数据量规模相差太大了,所以最终 reduce5 拖了后腿。我们 mapreduce 任务执行消耗的时间是一直统计到最后一个执行结束的 reduce 任务,所以就算其他 reduce 任务早都执行结束了也没有用,整个 mapreduce 任务是没有执行结束的。
- 到 yarn 的 web 界面查看任务的执行情况,由于有 10 个 reduce,所以一共有 10 行,在这我们截取了一部分。我们可以发现这里面有一个 reduce 任务消耗的时间比较长,其他 reduce 任务的执行时间都是 4~5 秒,这个 reduce 任务的执行时间是 1 分 26 秒,那就意味着值为 5 的那 860w 数据进入到这个 reduce 了,所以它执行的比较慢。
2,倾斜数据打散样例
(1)为了解决这个问题,我们就需要将倾斜的数据打散,即是把 5 这个数字打散。
- 比如:我把 5 这个数值的数据再分成 10 份,就在这个数值 5 后面拼上一个 0~9 的随机数即可。
(2)下面是完整的代码,与前面代码相比,我们只需要在 map 中把 k2 的值修改一下就可以了,这样就可以把值为 5 的数据打散了。
/** * 数据倾斜-把倾斜的数据打散 **/ public class WordCountJob { /** * Map阶段 */ public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{ Random random = new Random(); /** * 需要实现map函数 * 这个map函数就是可以接收<k1,v1>,产生<k2,v2> */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //输出k1,v1的值 System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容 //对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); //把单词封装成<k2,v2>的形式 String key = words[0]; if("5".equals(key)){ //把倾斜的key打散,分成10份 key = "5" + "_" + random.nextInt(10); } Text k2 = new Text(key); LongWritable v2 = new LongWritable(1L); //把<k2,v2>写出去 context.write(k2,v2); } } /** * Reduce阶段 */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ /** * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去 */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //创建一个sum变量,保存v2s的和 long sum = 0L; //对v2s中的数据进行累加求和 for(LongWritable v2: v2s){ //输出k2,v2的值 System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); sum += v2.get(); //模拟Reduce的复杂计算消耗的时间 if(sum % 200 ==0){ Thread.sleep(1); } } //组装k3,v3 Text k3 = k2; LongWritable v3 = new LongWritable(sum); //输出k3,v3的值 System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); // 把结果写出去 context.write(k3,v3); } } /** * 组装Job=Map+Reduce */ public static void main(String[] args) { try{ if(args.length!=3){ //如果传递的参数不够,程序直接退出 System.exit(100); } //指定Job需要的配置参数 Configuration conf = new Configuration(); //创建一个Job Job job = Job.getInstance(conf); //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的 job.setJarByClass(WordCountJob.class); //指定输入路径(可以是文件,也可以是目录) FileInputFormat.setInputPaths(job,new Path(args[0])); //指定输出路径(只能指定一个不存在的目录) FileOutputFormat.setOutputPath(job,new Path(args[1])); //指定map相关的代码 job.setMapperClass(MyMapper.class); //指定k2的类型 job.setMapOutputKeyClass(Text.class); //指定v2的类型 job.setMapOutputValueClass(LongWritable.class); //指定reduce相关的代码 job.setReducerClass(MyReducer.class); //指定k3的类型 job.setOutputKeyClass(Text.class); //指定v3的类型 job.setOutputValueClass(LongWritable.class); //设置reduce任务个数 job.setNumReduceTasks(Integer.parseInt(args[2])); //提交job job.waitForCompletion(true); }catch(Exception e){ e.printStackTrace(); } } }
3,运行测试
(1)首先我们对项目代码进行编译、打包,提交到集群去执行,这里我指定 reduce 任务为 10 个:
(2)执行成功之后查看一下 reduce 任务执行情况,在这里就没有发现特别耗时的 reduce 任务了,消耗的时间几乎都差不多,这样就充分利用了 reduce 任务的性能。
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob /input /output 10
(3)查看结果显示如下:
注意:这个时候我们获取到的最终结果是一个半成品,还需要进行一次加工,其实我们前面把这个倾斜的数据打散之后相当于做了一个局部聚合,现在还需要再开发一个 mapreduce 任务再做一次全局聚合,其实也很简单,获取到上一个 map 任务的输出,在 map 端读取到数据之后,对数据先使用空格分割,然后对第一列的数据再使用下划线分割,分割之后总是取第一列,这样就可以把值为 5 的数据还原出来了,这个时候数据一共就这么十几条,怎么处理都很快了。
hdfs dfs -cat /output/*
全部评论(0)