返回 导航

大数据

hangge.com

Hadoop - MapReduce任务程序开发教程2(只包含map阶段样例)

作者:hangge | 2024-07-12 08:51
    通常来说 MapReduce 任务是由 map 阶段和 reduce 阶段组成的,但是 reduce 阶段不是必须的,那也就意味着 MapReduce 程序可以只包含 map 阶段。下面我通过样例演示如何实现一个只包含 map 阶段的任务。

二、只包含 map 阶段样例

1,需求说明

(1)假设我们需要使用 MapReduce 实现一个单词转换功能,即读取 HDFS 上的 hello.txt 文件,将每个单词拆分后转换为大写。其中 hello.txt 文件内部分内容如下:
hello hangge.com
welcome to hangge.com
hello world
bye world

(2)MapReduce 代码开发流程如下:
  • 开发 Map 阶段代码。
  • 组装 MapReduce 任务。
  • MapReduce 任务打 Jar 包。
  • 向集群提交 MapReduce 任务。

2,添加 Hadoop 相关的依赖

首先我们创建一个 Maven 项目,然后在项目的 pom.xml 文件中添加 Hadoop 相关的依赖:
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.2.0</version>
    <scope>provided</scope>
</dependency>
注意:此处需要在 hadoop-client 依赖中增加 scope 属性,值为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。因为 hadoop-client 依赖在 Hadoop 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。

3,编写代码

这里我们编写一个MapReduce 任务类 WordCountJob,其内部包含如下几个部分:
  • 首先自定义一个 MyMapper 类,它继承 MapReduce 框架中的 Mapper 抽象类,这里面是 Map 阶段的代码。
  • 接着将 MyMapper 类组装起来,构建 MapReduce 任务。由于本次我们没有 reduce 阶段,因此在组装 Job 的时候设置 reduce task 数目为 0 就可以了。
public class WordConverJob {
  /**
   * 创建自定义mapper类
   */
  public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    /**
     * 需要实现map函数
     * 这个map函数就是可以接收k1,v1,产生k2,v2
     * @param k1
     * @param v1
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
      // k1代表的是每一行的行首偏移量,v1代表的是每一行内容
      // 对获取到的每一行数据进行切割,把单词切割出来
      String[] words = v1.toString().split(" ");

      // 迭代切割出来的单词数据
      for (String word : words) {
        // 把单词转换为大写
        Text k2 = new Text(word.toUpperCase());
        // 输出k2,v2
        context.write(k2, null);
      }
    }
  }

  /**
   * 组装job = 只有map
   */
  public static void main(String[] args){
    try{
      if(args.length != 2){
        //如果传递的参数不够,程序直接退出
        System.exit(100);
      }

      // job需要配置的参数
      // 创建一个配置对象
      Configuration conf = new Configuration();
      // 创建一个job
      Job job = Job.getInstance(conf);

      // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类
      job.setJarByClass(WordConverJob.class);

      // 指定输入路径
      FileInputFormat.setInputPaths(job,new Path(args[0]));
      // 指定输出路径
      FileOutputFormat.setOutputPath(job,new Path(args[1]));

      // 指定map相关的代码
      job.setMapperClass(MyMapper.class);
      // 指定k2的类型
      job.setMapOutputKeyClass(Text.class);
      // 指定v2的类型
      job.setMapOutputValueClass(LongWritable.class);

      // 不需要reduce阶段
      job.setNumReduceTasks(0);

      // 提交job
      job.waitForCompletion(true);
    }catch (Exception e){
      e.printStackTrace();
    }
  }
}

4,对 MapReduce 任务打 Jar 包

(1)如果 MapReduce 代码开完毕后想要执行,则需要将其打为 Jar 包。此时需要在此项目对应的 pom.xml 文件中添加 Maven 的编译打包插件配置。
<build>
    <plugins>
        <!-- compiler 插件,设定 JDK 版本 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <encoding>UTF-8</encoding>
                <source>1.8</source>
                <target>1.8</target>
                <showWarnings>true</showWarnings>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(2)然后执行打 Jar 包的操作:

(3)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。

5,准备测试数据

(1)我们登录服务器,创建一个 hello.txt
vi hello.txt

(2)文件内容如下:
hello hangge.com
welcome to hangge.com
hello world
bye world

(3)接着执行如下命令把测试数据上传至 HDFS
hdfs dfs -put hello.txt /

6,向集群提交 MapReduce 任务

(1)我们将前面生成的 Jar 包上传至 Hadoop 集群的任意一台机器上,或者 Hadoop 客户端机器上,并且执行如下命令向集群提交 Jar 包:
注意MapReduce 任务中指定的输出目录(/out)必须是一个之前不存在的目录,否则任务执行时会报错。
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordConverJob /hello.txt /out

(2)在将任务提交到集群后,可以在提交任务的命令行中看到如下日志信息。如果 map 执行到 100%reduce 执行到 100%,则说明任务执行成功了。
2024-01-22 15:12:59,887 INFO mapreduce.Job: map 0% reduce 0%
2024-01-22 15:13:08,050 INFO mapreduce.Job: map 100% reduce 0%
2024-01-22 15:13:16,261 INFO mapreduce.Job: map 100% reduce 100%

(3)我们也可以使用浏览器访问 yarn 任务监控 web 页面,地址为 http://主节点IP:8088,查看任务执行情况:

(4)待任务结束后,我们可以执行如下命令查看任务输出的结果:
hdfs dfs -ls /out

(5)查看结果文件内容,可以看到具体的单词拆分转换结果:
hdfs dfs -cat /out/part-r-00000
评论

全部评论(0)

回到顶部