Hadoop - 解决数据倾斜问题教程(增加reduce任务数+打散倾斜数据)
作者:hangge | 2024-08-19 08:38
我们知道 MapReduce 是分为 Map 阶段和 Reduce 阶段,其实提高执行效率就是提高这两个阶段的执行效率。默认情况下 Map 阶段中 Map 任务的个数是和数据的 InputSplit 相关的,而 InputSplit 的个数一般是和 Block 块是有关联的,所以可以认为 Map 任务的个数和数据的 block 块个数有关系。针对 Map 任务的个数我们一般是不需要干预的,除非是前面我们说的海量小文件,那个时候可以考虑把小文件合并成大文件。其他情况是不需要调整的。
那就剩下 Reduce 阶段了,默认情况下 reduce 的个数是 1 个,所以现在 MapReduce 任务的压力就集中在 Reduce 阶段。在数据量比较大的时候,一个 reduce 任务处理起来肯定是比较慢的,所以我们可以考虑增加 reduce 任务的个数,这样就可以实现数据分流了,提高计算效率了。
那就剩下 Reduce 阶段了,默认情况下 reduce 的个数是 1 个,所以现在 MapReduce 任务的压力就集中在 Reduce 阶段。在数据量比较大的时候,一个 reduce 任务处理起来肯定是比较慢的,所以我们可以考虑增加 reduce 任务的个数,这样就可以实现数据分流了,提高计算效率了。
一、增加 reduce 任务数
1,准备工作
(1)假设我们有一个文件,有 1000W 条数据,里面的内容如下:
8 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 8 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 6 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 4 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
(2)每一行开头都是数字,1,2,3,4,5,6,7,8,9,10,我们希望统计出来每个数字出现的次数:
- 假设这 1000W 条数据的文件有 3 个 block,会产生 3 个 InputSplt,最终会产生 3 个 Map 任务,默认情况下只有一个 reduce 任务,所以所有的数据都会让这一个 reduce 任务处理,这样这个 Reduce 压力肯定很大,大量的时间都消耗在了这里。
(3)我们可以增加 reduce 任务的数量,把 reduce 任务的数量调整到 10 个,这个时候就会把 1000w 条数据让这 10 个 reduce 任务并行处理了,这个时候效率肯定会有一定的提升。
2,实现样例
(1)如果想要多个分区,很简单,只需要把 numReduceTasks 的数目调大即可。
- numReduceTasks 其实就是 reduce 任务的数量,那也就意味着,只要 redcue 任务数量变大了,对应的分区数也就变多了,有多少个分区就会有多少个 reduce 任务,那我们就不需要单独增加分区的数量了,只需要控制好 Redcue 任务的数量即可。
(2)下面是完整的代码:
/**
* 增加Reduce任务个数
**/
public class WordCountJob {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//输出k1,v1的值
System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//把单词封装成<k2,v2>的形式
Text k2 = new Text(words[0]);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
/**
* 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for(LongWritable v2: v2s){
//输出k2,v2的值
System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum += v2.get();
//模拟Reduce的复杂计算消耗的时间
if(sum % 200 ==0){
Thread.sleep(1);
}
}
//组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出k3,v3的值
System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
// 把结果写出去
context.write(k3,v3);
}
}
/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
if(args.length!=3){
//如果传递的参数不够,程序直接退出
System.exit(100);
}
//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
job.setJarByClass(WordCountJob.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3的类型
job.setOutputKeyClass(Text.class);
//指定v3的类型
job.setOutputValueClass(LongWritable.class);
//设置reduce任务个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
//提交job
job.waitForCompletion(true);
}catch(Exception e){
e.printStackTrace();
}
}
}
3,运行测试
(1)首先我们对项目代码进行编译、打包,提交到集群去执行,这里我指定 reduce 任务为 10 个:
(2)然后我们再到 yarn 的 web 界面查看任务的执行情况,可以看到 map 任务数量是 3,而 reduce 任务的数量确实增加到 10 个:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob /input /output 10
(2)然后我们再到 yarn 的 web 界面查看任务的执行情况,可以看到 map 任务数量是 3,而 reduce 任务的数量确实增加到 10 个:

(3)由于 reduce 任务的数量增加到 10 个,因此最终生成的结果文件也是 10 个(每个文件对应一个分区数据的统计结果)。

(4)我们可以通过如下命令查看所有的结果数据:
hdfs dfs -cat /output/*
二、数据倾斜问题解决
1,问题描述
(1)从上面的统计结果可以看到,值为5的数据有 840w 条左右,剩下的 9 个数字一共只有 160w 条,那也就意味着,这份数据中,值为 5 的数据比较集中,或者说值为 5 的数据属于倾斜的数据,在这一整份数据中,它占得比重比其他的数据多得多。
(2)虽然我们将 reduce 任务的数量增加到 10 个,但由于整个文件中值为 5 的数据占了大部分,这些数据只会被一个 reduce 任务处理,在这里假设是让 reduce5 处理了,reduce5 这个任务执行的是比较慢的,其他 reduce 任务都执行结束很长时间了,它还没执行结束,因为 reduce5 中处理的数据量和其他 reduce 中处理的数据量规模相差太大了,所以最终 reduce5 拖了后腿。我们 mapreduce 任务执行消耗的时间是一直统计到最后一个执行结束的 reduce 任务,所以就算其他 reduce 任务早都执行结束了也没有用,整个 mapreduce 任务是没有执行结束的。
- 到 yarn 的 web 界面查看任务的执行情况,由于有 10 个 reduce,所以一共有 10 行,在这我们截取了一部分。我们可以发现这里面有一个 reduce 任务消耗的时间比较长,其他 reduce 任务的执行时间都是 4~5 秒,这个 reduce 任务的执行时间是 1 分 26 秒,那就意味着值为 5 的那 860w 数据进入到这个 reduce 了,所以它执行的比较慢。
2,倾斜数据打散样例
(1)为了解决这个问题,我们就需要将倾斜的数据打散,即是把 5 这个数字打散。
- 比如:我把 5 这个数值的数据再分成 10 份,就在这个数值 5 后面拼上一个 0~9 的随机数即可。
(2)下面是完整的代码,与前面代码相比,我们只需要在 map 中把 k2 的值修改一下就可以了,这样就可以把值为 5 的数据打散了。
/**
* 数据倾斜-把倾斜的数据打散
**/
public class WordCountJob {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
Random random = new Random();
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//输出k1,v1的值
System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//把单词封装成<k2,v2>的形式
String key = words[0];
if("5".equals(key)){
//把倾斜的key打散,分成10份
key = "5" + "_" + random.nextInt(10);
}
Text k2 = new Text(key);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
/**
* 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for(LongWritable v2: v2s){
//输出k2,v2的值
System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum += v2.get();
//模拟Reduce的复杂计算消耗的时间
if(sum % 200 ==0){
Thread.sleep(1);
}
}
//组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出k3,v3的值
System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
// 把结果写出去
context.write(k3,v3);
}
}
/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
if(args.length!=3){
//如果传递的参数不够,程序直接退出
System.exit(100);
}
//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
job.setJarByClass(WordCountJob.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3的类型
job.setOutputKeyClass(Text.class);
//指定v3的类型
job.setOutputValueClass(LongWritable.class);
//设置reduce任务个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
//提交job
job.waitForCompletion(true);
}catch(Exception e){
e.printStackTrace();
}
}
}
3,运行测试
(1)首先我们对项目代码进行编译、打包,提交到集群去执行,这里我指定 reduce 任务为 10 个:
(2)执行成功之后查看一下 reduce 任务执行情况,在这里就没有发现特别耗时的 reduce 任务了,消耗的时间几乎都差不多,这样就充分利用了 reduce 任务的性能。
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob /input /output 10
(3)查看结果显示如下:
注意:这个时候我们获取到的最终结果是一个半成品,还需要进行一次加工,其实我们前面把这个倾斜的数据打散之后相当于做了一个局部聚合,现在还需要再开发一个 mapreduce 任务再做一次全局聚合,其实也很简单,获取到上一个 map 任务的输出,在 map 端读取到数据之后,对数据先使用空格分割,然后对第一列的数据再使用下划线分割,分割之后总是取第一列,这样就可以把值为 5 的数据还原出来了,这个时候数据一共就这么十几条,怎么处理都很快了。
hdfs dfs -cat /output/*
全部评论(0)