返回 导航

大数据

hangge.com

Hadoop - MapReduce任务程序开发教程1(WordCount功能实现样例)

作者:hangge | 2024-07-11 08:59

一、WordCount 功能实现样例

1,需求说明

(1)假设我们需要使用 MapReduce 实现一个 WordCount 功能,即读取 HDFS 上的 hello.txt 文件,计算文件中每个单词出现的总次数。其中 hello.txt 文件内部分内容如下:
hello hangge.com
welcome to hangge.com
hello world
bye world

(2)MapReduce 代码开发流程如下:
  • 开发 Map 阶段代码。
  • 开发 Reduce 阶段代码。
  • 组装 MapReduce 任务。
  • MapReduce 任务打 Jar 包。
  • 向集群提交 MapReduce 任务。

2,添加 Hadoop 相关的依赖

首先我们创建一个 Maven 项目,然后在项目的 pom.xml 文件中添加 Hadoop 相关的依赖:
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.2.0</version>
    <scope>provided</scope>
</dependency>
注意:此处需要在 hadoop-client 依赖中增加 scope 属性,值为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。因为 hadoop-client 依赖在 Hadoop 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。

3,编写代码

这里我们编写一个MapReduce 任务类 WordCountJob,其内部包含如下几个部分:
  • 首先自定义一个 MyMapper 类,它继承 MapReduce 框架中的 Mapper 抽象类,这里面是 Map 阶段的代码。
  • 接着自定义一个 MyReducer 类,它继承 MapReduce 框架中的 Reducer 抽象类,这里面是 Reduce 阶段的代码。
  • 最后将 MyMapper 类和 MyReducer 类组装起来,构建 MapReduce 任务。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class WordCountJob {
  /**
   * 创建自定义mapper类
   */
  public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
    /**
     * 需要实现map函数
     * 这个map函数就是可以接收k1,v1,产生k2,v2
     * @param k1
     * @param v1
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
      // k1代表的是每一行的行首偏移量,v1代表的是每一行内容
      // 对获取到的每一行数据进行切割,把单词切割出来
      String[] words = v1.toString().split(" ");

      // 迭代切割出来的单词数据
      for(String word:words){
        // 把迭代出来的单词封装成<k2,v2>的形式
        Text k2 = new Text(word);
        LongWritable v2 = new LongWritable(1L);
        // 输出k2,v2
        context.write(k2,v2);
      }
    }
  }

  /**
   * 创建自定义的reducer类
   */
  public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    /**
     * 针对v2s的数据进行累加求和
     * @param k2
     * @param v2s
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
            throws IOException, InterruptedException {
      //v2s {1,1,1,1}
      //创建一个sum变量,保存v2s的和
      long sum = 0L;
      for(LongWritable v2:v2s){
        sum += v2.get();
      }
      //组装k3,v3
      Text k3 = k2;
      LongWritable v3 = new LongWritable(sum);

      //输出结果
      context.write(k3,v3);
    }
  }

  /**
   * 组装job = map + reduce
   */
  public static void main(String[] args){
    try{
      if(args.length != 2){
        //如果传递的参数不够,程序直接退出
        System.exit(100);
      }

      // job需要配置的参数
      // 创建一个配置对象
      Configuration conf = new Configuration();
      // 创建一个job
      Job job = Job.getInstance(conf);

      // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类
      job.setJarByClass(WordCountJob.class);

      // 指定输入路径
      FileInputFormat.setInputPaths(job,new Path(args[0]));
      // 指定输出路径
      FileOutputFormat.setOutputPath(job,new Path(args[1]));

      // 指定map相关的代码
      job.setMapperClass(MyMapper.class);
      // 指定k2的类型
      job.setMapOutputKeyClass(Text.class);
      // 指定v2的类型
      job.setMapOutputValueClass(LongWritable.class);

      // 指定reduce相关的代码
      job.setReducerClass(MyReducer.class);
      // 指定k3的类型
      job.setOutputKeyClass(Text.class);
      // 指定v3的类型
      job.setOutputValueClass(LongWritable.class);

      // 提交job
      job.waitForCompletion(true);
    }catch (Exception e){
      e.printStackTrace();
    }
  }
}

4,对 MapReduce 任务打 Jar 包

(1)如果 MapReduce 代码开完毕后想要执行,则需要将其打为 Jar 包。此时需要在此项目对应的 pom.xml 文件中添加 Maven 的编译打包插件配置。
<build>
    <plugins>
        <!-- compiler 插件,设定 JDK 版本 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <encoding>UTF-8</encoding>
                <source>1.8</source>
                <target>1.8</target>
                <showWarnings>true</showWarnings>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(2)然后执行打 Jar 包的操作:

(3)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。

5,准备测试数据

(1)我们登录服务器,创建一个 hello.txt
vi hello.txt

(2)文件内容如下:
hello hangge.com
welcome to hangge.com
hello world
bye world

(3)接着执行如下命令把测试数据上传至 HDFS
hdfs dfs -put hello.txt /

6,向集群提交 MapReduce 任务

(1)我们将前面生成的 Jar 包上传至 Hadoop 集群的任意一台机器上,或者 Hadoop 客户端机器上,并且执行如下命令向集群提交 Jar 包:
注意MapReduce 任务中指定的输出目录(/out)必须是一个之前不存在的目录,否则任务执行时会报错。
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordCountJob /hello.txt /out

(2)在将任务提交到集群后,可以在提交任务的命令行中看到如下日志信息。如果 map 执行到 100%reduce 执行到 100%,则说明任务执行成功了。
2024-01-22 15:12:59,887 INFO mapreduce.Job: map 0% reduce 0%
2024-01-22 15:13:08,050 INFO mapreduce.Job: map 100% reduce 0%
2024-01-22 15:13:16,261 INFO mapreduce.Job: map 100% reduce 100%

(3)我们也可以使用浏览器访问 yarn 任务监控 web 页面,地址为 http://主节点IP:8088,查看任务执行情况:

(4)待任务结束后,我们可以执行如下命令查看任务输出的结果:
hdfs dfs -ls /out

(5)查看结果文件内容,可以看到具体的单词统计结果:
hdfs dfs -cat /out/part-r-00000

附:指定目录作为输入

1,将目录作为输入路径

(1)假设我们在 HDFS /input 目录下有两个文件 hello1.txthello2.txt

(2)如果我们希望 Hadoop 会读取该目录下的所有文件进行计算,只需要运行时将该目录路径作为第一个输入参数即可:
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordCountJob /input /output

(3)查看日志可以发现一共产生了 2map 任务,说明两个文件都读取到了:

(4)查看结果数据,可以发现统计结果也是从两个文件中计算汇总而来:

2,递归遍历子目录

(1)Hadoop 默认情况下不会递归遍历子目录,如果我们的 /input 目录下还有个 2024 子目录,该子目录下有个 hello3.txt

(2)当我们运行任务会发现,不仅 hello3.txt 这个文件没有读取到,并且整个任务会失败。
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordCountJob /input /output

(3)这是因为 Hadoop 默认情况下不会递归遍历子目录。为了使 Hadoop 递归地读取目录及其所有子目录中的文件,您需要使用 FileInputFormat.setInputDirRecursive 方法。下面是修改后的代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class WordCountJob {
  /**
   * 创建自定义mapper类
   */
  public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
    /**
     * 需要实现map函数
     * 这个map函数就是可以接收k1,v1,产生k2,v2
     * @param k1
     * @param v1
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
      // k1代表的是每一行的行首偏移量,v1代表的是每一行内容
      // 对获取到的每一行数据进行切割,把单词切割出来
      String[] words = v1.toString().split(" ");

      // 迭代切割出来的单词数据
      for(String word:words){
        // 把迭代出来的单词封装成<k2,v2>的形式
        Text k2 = new Text(word);
        LongWritable v2 = new LongWritable(1L);
        // 输出k2,v2
        context.write(k2,v2);
      }
    }
  }

  /**
   * 创建自定义的reducer类
   */
  public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    /**
     * 针对v2s的数据进行累加求和
     * @param k2
     * @param v2s
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
            throws IOException, InterruptedException {
      //v2s {1,1,1,1}
      //创建一个sum变量,保存v2s的和
      long sum = 0L;
      for(LongWritable v2:v2s){
        sum += v2.get();
      }
      //组装k3,v3
      Text k3 = k2;
      LongWritable v3 = new LongWritable(sum);

      //输出结果
      context.write(k3,v3);
    }
  }

  /**
   * 组装job = map + reduce
   */
  public static void main(String[] args){
    try{
      if(args.length != 2){
        //如果传递的参数不够,程序直接退出
        System.exit(100);
      }

      // job需要配置的参数
      // 创建一个配置对象
      Configuration conf = new Configuration();
      // 创建一个job
      Job job = Job.getInstance(conf);

      // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类
      job.setJarByClass(WordCountJob.class);

      // 指定输入路径
      FileInputFormat.setInputPaths(job,new Path(args[0]));
      // 启用递归目录遍历
      FileInputFormat.setInputDirRecursive(job, true);
      // 指定输出路径
      FileOutputFormat.setOutputPath(job,new Path(args[1]));

      // 指定map相关的代码
      job.setMapperClass(MyMapper.class);
      // 指定k2的类型
      job.setMapOutputKeyClass(Text.class);
      // 指定v2的类型
      job.setMapOutputValueClass(LongWritable.class);

      // 指定reduce相关的代码
      job.setReducerClass(MyReducer.class);
      // 指定k3的类型
      job.setOutputKeyClass(Text.class);
      // 指定v3的类型
      job.setOutputValueClass(LongWritable.class);

      // 提交job
      job.waitForCompletion(true);
    }catch (Exception e){
      e.printStackTrace();
    }
  }
}
评论

全部评论(0)

回到顶部