Hadoop - MapReduce任务程序开发教程2(只包含map阶段样例)
作者:hangge | 2024-07-12 08:51
通常来说 MapReduce 任务是由 map 阶段和 reduce 阶段组成的,但是 reduce 阶段不是必须的,那也就意味着 MapReduce 程序可以只包含 map 阶段。下面我通过样例演示如何实现一个只包含 map 阶段的任务。
二、只包含 map 阶段样例
1,需求说明
(1)假设我们需要使用 MapReduce 实现一个单词转换功能,即读取 HDFS 上的 hello.txt 文件,将每个单词拆分后转换为大写。其中 hello.txt 文件内部分内容如下:
hello hangge.com welcome to hangge.com hello world bye world
(2)MapReduce 代码开发流程如下:
- 开发 Map 阶段代码。
- 组装 MapReduce 任务。
- 对 MapReduce 任务打 Jar 包。
- 向集群提交 MapReduce 任务。
2,添加 Hadoop 相关的依赖
首先我们创建一个 Maven 项目,然后在项目的 pom.xml 文件中添加 Hadoop 相关的依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
注意:此处需要在 hadoop-client 依赖中增加 scope 属性,值为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。因为 hadoop-client 依赖在 Hadoop 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。
3,编写代码
这里我们编写一个MapReduce 任务类 WordCountJob,其内部包含如下几个部分:
- 首先自定义一个 MyMapper 类,它继承 MapReduce 框架中的 Mapper 抽象类,这里面是 Map 阶段的代码。
- 接着将 MyMapper 类组装起来,构建 MapReduce 任务。由于本次我们没有 reduce 阶段,因此在组装 Job 的时候设置 reduce 的 task 数目为 0 就可以了。
public class WordConverJob {
/**
* 创建自定义mapper类
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* 需要实现map函数
* 这个map函数就是可以接收k1,v1,产生k2,v2
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// k1代表的是每一行的行首偏移量,v1代表的是每一行内容
// 对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
// 迭代切割出来的单词数据
for (String word : words) {
// 把单词转换为大写
Text k2 = new Text(word.toUpperCase());
// 输出k2,v2
context.write(k2, null);
}
}
}
/**
* 组装job = 只有map
*/
public static void main(String[] args){
try{
if(args.length != 2){
//如果传递的参数不够,程序直接退出
System.exit(100);
}
// job需要配置的参数
// 创建一个配置对象
Configuration conf = new Configuration();
// 创建一个job
Job job = Job.getInstance(conf);
// 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类
job.setJarByClass(WordConverJob.class);
// 指定输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 指定输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 指定map相关的代码
job.setMapperClass(MyMapper.class);
// 指定k2的类型
job.setMapOutputKeyClass(Text.class);
// 指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
// 不需要reduce阶段
job.setNumReduceTasks(0);
// 提交job
job.waitForCompletion(true);
}catch (Exception e){
e.printStackTrace();
}
}
}
4,对 MapReduce 任务打 Jar 包
(1)如果 MapReduce 代码开完毕后想要执行,则需要将其打为 Jar 包。此时需要在此项目对应的 pom.xml 文件中添加 Maven 的编译打包插件配置。
<build>
<plugins>
<!-- compiler 插件,设定 JDK 版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
(2)然后执行打 Jar 包的操作:

(3)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。

5,准备测试数据
(1)我们登录服务器,创建一个 hello.txt
vi hello.txt
(2)文件内容如下:
hello hangge.com welcome to hangge.com hello world bye world
(3)接着执行如下命令把测试数据上传至 HDFS:
hdfs dfs -put hello.txt /
6,向集群提交 MapReduce 任务
(1)我们将前面生成的 Jar 包上传至 Hadoop 集群的任意一台机器上,或者 Hadoop 客户端机器上,并且执行如下命令向集群提交 Jar 包:
注意:MapReduce 任务中指定的输出目录(/out)必须是一个之前不存在的目录,否则任务执行时会报错。
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordConverJob /hello.txt /out
(2)在将任务提交到集群后,可以在提交任务的命令行中看到如下日志信息。如果 map 执行到 100%,reduce 执行到 100%,则说明任务执行成功了。
2024-01-22 15:12:59,887 INFO mapreduce.Job: map 0% reduce 0% 2024-01-22 15:13:08,050 INFO mapreduce.Job: map 100% reduce 0% 2024-01-22 15:13:16,261 INFO mapreduce.Job: map 100% reduce 100%
(3)我们也可以使用浏览器访问 yarn 任务监控 web 页面,地址为 http://主节点IP:8088,查看任务执行情况:

(4)待任务结束后,我们可以执行如下命令查看任务输出的结果:
hdfs dfs -ls /out
- _SUCCESS 是一个标识文件,有这个文件表示这个任务执行成功了。
- part-r-00000 是具体的结果文件,如果有多个 Reduce Task,则会产生多个结果文件。多个文件会按照顺序编号:part-r-00001、part-r-00002 等。
(5)查看结果文件内容,可以看到具体的单词拆分转换结果:
hdfs dfs -cat /out/part-r-00000
全部评论(0)