返回 导航

大数据

hangge.com

Hadoop - 解决数据倾斜问题教程(增加reduce任务数+打散倾斜数据)

作者:hangge | 2024-08-19 08:38
    我们知道 MapReduce 是分为 Map 阶段和 Reduce 阶段,其实提高执行效率就是提高这两个阶段的执行效率。默认情况下 Map 阶段中 Map 任务的个数是和数据的 InputSplit 相关的,而 InputSplit 的个数一般是和 Block 块是有关联的,所以可以认为 Map 任务的个数和数据的 block 块个数有关系。针对 Map 任务的个数我们一般是不需要干预的,除非是前面我们说的海量小文件,那个时候可以考虑把小文件合并成大文件。其他情况是不需要调整的。
    那就剩下 Reduce 阶段了,默认情况下 reduce 的个数是 1 个,所以现在 MapReduce 任务的压力就集中在 Reduce 阶段。在数据量比较大的时候,一个 reduce 任务处理起来肯定是比较慢的,所以我们可以考虑增加 reduce 任务的个数,这样就可以实现数据分流了,提高计算效率了。

一、增加 reduce 任务数

1,准备工作

(1)假设我们有一个文件,有 1000W 条数据,里面的内容如下:
8 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
8 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
6 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
4 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false

(2)每一行开头都是数字,1,2,3,4,5,6,7,8,9,10,我们希望统计出来每个数字出现的次数:
  • 假设这 1000W 条数据的文件有 3block,会产生 3InputSplt,最终会产生 3Map 任务,默认情况下只有一个 reduce 任务,所以所有的数据都会让这一个 reduce 任务处理,这样这个 Reduce 压力肯定很大,大量的时间都消耗在了这里。

(3)我们可以增加 reduce 任务的数量,把 reduce 任务的数量调整到 10 个,这个时候就会把 1000w 条数据让这 10 reduce 任务并行处理了,这个时候效率肯定会有一定的提升。

2,实现样例

(1)如果想要多个分区,很简单,只需要把 numReduceTasks 的数目调大即可。
  • numReduceTasks 其实就是 reduce 任务的数量,那也就意味着,只要 redcue 任务数量变大了,对应的分区数也就变多了,有多少个分区就会有多少个 reduce 任务,那我们就不需要单独增加分区的数量了,只需要控制好 Redcue 任务的数量即可。

(2)下面是完整的代码:
/**
 * 增加Reduce任务个数
 **/
public class WordCountJob {
  /**
   * Map阶段
   */
  public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
    /**
     * 需要实现map函数
     * 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
     */
    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
      //输出k1,v1的值
      System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
      //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
      //对获取到的每一行数据进行切割,把单词切割出来
      String[] words = v1.toString().split(" ");
      //把单词封装成<k2,v2>的形式
      Text k2 = new Text(words[0]);
      LongWritable v2 = new LongWritable(1L);
      //把<k2,v2>写出去
      context.write(k2,v2);
    }
  }

  /**
   * Reduce阶段
   */
  public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
    /**
     * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
     */
    @Override
    protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
            throws IOException, InterruptedException {
      //创建一个sum变量,保存v2s的和
      long sum = 0L;

      //对v2s中的数据进行累加求和
      for(LongWritable v2: v2s){
        //输出k2,v2的值
        System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
        sum += v2.get();
        //模拟Reduce的复杂计算消耗的时间
        if(sum % 200 ==0){
          Thread.sleep(1);
        }
      }

      //组装k3,v3
      Text k3 = k2;
      LongWritable v3 = new LongWritable(sum);
      //输出k3,v3的值
      System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
      // 把结果写出去
      context.write(k3,v3);
    }
  }

  /**
   * 组装Job=Map+Reduce
   */
  public static void main(String[] args) {
    try{
      if(args.length!=3){
        //如果传递的参数不够,程序直接退出
        System.exit(100);
      }

      //指定Job需要的配置参数
      Configuration conf = new Configuration();
      //创建一个Job
      Job job = Job.getInstance(conf);

      //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
      job.setJarByClass(WordCountJob.class);

      //指定输入路径(可以是文件,也可以是目录)
      FileInputFormat.setInputPaths(job,new Path(args[0]));
      //指定输出路径(只能指定一个不存在的目录)
      FileOutputFormat.setOutputPath(job,new Path(args[1]));

      //指定map相关的代码
      job.setMapperClass(MyMapper.class);
      //指定k2的类型
      job.setMapOutputKeyClass(Text.class);
      //指定v2的类型
      job.setMapOutputValueClass(LongWritable.class);

      //指定reduce相关的代码
      job.setReducerClass(MyReducer.class);
      //指定k3的类型
      job.setOutputKeyClass(Text.class);
      //指定v3的类型
      job.setOutputValueClass(LongWritable.class);
      //设置reduce任务个数
      job.setNumReduceTasks(Integer.parseInt(args[2]));

      //提交job
      job.waitForCompletion(true);
    }catch(Exception e){
      e.printStackTrace();
    }
  }
}

3,运行测试

(1)首先我们对项目代码进行编译、打包,提交到集群去执行,这里我指定 reduce 任务为 10 个:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob /input /output 10

(2)然后我们再到 yarn web 界面查看任务的执行情况,可以看到 map 任务数量是 3,而 reduce 任务的数量确实增加到 10 个:

(3)由于 reduce 任务的数量增加到 10 个,因此最终生成的结果文件也是 10 个(每个文件对应一个分区数据的统计结果)。

(4)我们可以通过如下命令查看所有的结果数据:
hdfs dfs -cat /output/*

二、数据倾斜问题解决

1,问题描述

(1)从上面的统计结果可以看到,值为5的数据有 840w 条左右,剩下的 9 个数字一共只有 160w 条,那也就意味着,这份数据中,值为 5 的数据比较集中,或者说值为 5 的数据属于倾斜的数据,在这一整份数据中,它占得比重比其他的数据多得多。

(2)虽然我们将 reduce 任务的数量增加到 10 个,但由于整个文件中值为 5 的数据占了大部分,这些数据只会被一个 reduce 任务处理,在这里假设是让 reduce5 处理了,reduce5 这个任务执行的是比较慢的,其他 reduce 任务都执行结束很长时间了,它还没执行结束,因为 reduce5 中处理的数据量和其他 reduce 中处理的数据量规模相差太大了,所以最终 reduce5 拖了后腿。我们 mapreduce 任务执行消耗的时间是一直统计到最后一个执行结束的 reduce 任务,所以就算其他 reduce 任务早都执行结束了也没有用,整个 mapreduce 任务是没有执行结束的。
  • yarn web 界面查看任务的执行情况,由于有 10 reduce,所以一共有 10 行,在这我们截取了一部分。我们可以发现这里面有一个 reduce 任务消耗的时间比较长,其他 reduce 任务的执行时间都是 4~5 秒,这个 reduce 任务的执行时间是 1 26 秒,那就意味着值为 5 的那 860w 数据进入到这个 reduce 了,所以它执行的比较慢。

2,倾斜数据打散样例

(1)为了解决这个问题,我们就需要将倾斜的数据打散,即是把 5 这个数字打散。
  • 比如:我把 5 这个数值的数据再分成 10 份,就在这个数值 5 后面拼上一个 0~9 的随机数即可。
(2)下面是完整的代码,与前面代码相比,我们只需要在 map 中把 k2 的值修改一下就可以了,这样就可以把值为 5 的数据打散了。
/**
 * 数据倾斜-把倾斜的数据打散
 **/
public class WordCountJob {
  /**
   * Map阶段
   */
  public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{

    Random random = new Random();

    /**
     * 需要实现map函数
     * 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
     */
    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
      //输出k1,v1的值
      System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
      //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
      //对获取到的每一行数据进行切割,把单词切割出来
      String[] words = v1.toString().split(" ");
      //把单词封装成<k2,v2>的形式
      String key = words[0];
      if("5".equals(key)){
        //把倾斜的key打散,分成10份
        key = "5" + "_" + random.nextInt(10);
      }
      Text k2 = new Text(key);
      LongWritable v2 = new LongWritable(1L);
      //把<k2,v2>写出去
      context.write(k2,v2);
    }
  }

  /**
   * Reduce阶段
   */
  public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
    /**
     * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
     */
    @Override
    protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
            throws IOException, InterruptedException {
      //创建一个sum变量,保存v2s的和
      long sum = 0L;

      //对v2s中的数据进行累加求和
      for(LongWritable v2: v2s){
        //输出k2,v2的值
        System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
        sum += v2.get();
        //模拟Reduce的复杂计算消耗的时间
        if(sum % 200 ==0){
          Thread.sleep(1);
        }
      }

      //组装k3,v3
      Text k3 = k2;
      LongWritable v3 = new LongWritable(sum);
      //输出k3,v3的值
      System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
      // 把结果写出去
      context.write(k3,v3);
    }
  }

  /**
   * 组装Job=Map+Reduce
   */
  public static void main(String[] args) {
    try{
      if(args.length!=3){
        //如果传递的参数不够,程序直接退出
        System.exit(100);
      }

      //指定Job需要的配置参数
      Configuration conf = new Configuration();
      //创建一个Job
      Job job = Job.getInstance(conf);

      //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
      job.setJarByClass(WordCountJob.class);

      //指定输入路径(可以是文件,也可以是目录)
      FileInputFormat.setInputPaths(job,new Path(args[0]));
      //指定输出路径(只能指定一个不存在的目录)
      FileOutputFormat.setOutputPath(job,new Path(args[1]));

      //指定map相关的代码
      job.setMapperClass(MyMapper.class);
      //指定k2的类型
      job.setMapOutputKeyClass(Text.class);
      //指定v2的类型
      job.setMapOutputValueClass(LongWritable.class);

      //指定reduce相关的代码
      job.setReducerClass(MyReducer.class);
      //指定k3的类型
      job.setOutputKeyClass(Text.class);
      //指定v3的类型
      job.setOutputValueClass(LongWritable.class);
      //设置reduce任务个数
      job.setNumReduceTasks(Integer.parseInt(args[2]));

      //提交job
      job.waitForCompletion(true);
    }catch(Exception e){
      e.printStackTrace();
    }
  }
}

3,运行测试

(1)首先我们对项目代码进行编译、打包,提交到集群去执行,这里我指定 reduce 任务为 10 个:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob /input /output 10

(2)执行成功之后查看一下 reduce 任务执行情况,在这里就没有发现特别耗时的 reduce 任务了,消耗的时间几乎都差不多,这样就充分利用了 reduce 任务的性能。

(3)查看结果显示如下:
注意:这个时候我们获取到的最终结果是一个半成品,还需要进行一次加工,其实我们前面把这个倾斜的数据打散之后相当于做了一个局部聚合,现在还需要再开发一个 mapreduce 任务再做一次全局聚合,其实也很简单,获取到上一个 map 任务的输出,在 map 端读取到数据之后,对数据先使用空格分割,然后对第一列的数据再使用下划线分割,分割之后总是取第一列,这样就可以把值为 5 的数据还原出来了,这个时候数据一共就这么十几条,怎么处理都很快了。
hdfs dfs -cat /output/*
评论

全部评论(0)

回到顶部