Hadoop - MapReduce任务程序开发教程2(只包含map阶段样例)
作者:hangge | 2024-07-12 08:51
通常来说 MapReduce 任务是由 map 阶段和 reduce 阶段组成的,但是 reduce 阶段不是必须的,那也就意味着 MapReduce 程序可以只包含 map 阶段。下面我通过样例演示如何实现一个只包含 map 阶段的任务。
二、只包含 map 阶段样例
1,需求说明
(1)假设我们需要使用 MapReduce 实现一个单词转换功能,即读取 HDFS 上的 hello.txt 文件,将每个单词拆分后转换为大写。其中 hello.txt 文件内部分内容如下:
hello hangge.com welcome to hangge.com hello world bye world
(2)MapReduce 代码开发流程如下:
- 开发 Map 阶段代码。
- 组装 MapReduce 任务。
- 对 MapReduce 任务打 Jar 包。
- 向集群提交 MapReduce 任务。
2,添加 Hadoop 相关的依赖
首先我们创建一个 Maven 项目,然后在项目的 pom.xml 文件中添加 Hadoop 相关的依赖:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency>
注意:此处需要在 hadoop-client 依赖中增加 scope 属性,值为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。因为 hadoop-client 依赖在 Hadoop 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。
3,编写代码
这里我们编写一个MapReduce 任务类 WordCountJob,其内部包含如下几个部分:
- 首先自定义一个 MyMapper 类,它继承 MapReduce 框架中的 Mapper 抽象类,这里面是 Map 阶段的代码。
- 接着将 MyMapper 类组装起来,构建 MapReduce 任务。由于本次我们没有 reduce 阶段,因此在组装 Job 的时候设置 reduce 的 task 数目为 0 就可以了。
public class WordConverJob { /** * 创建自定义mapper类 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { /** * 需要实现map函数 * 这个map函数就是可以接收k1,v1,产生k2,v2 * @param k1 * @param v1 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // k1代表的是每一行的行首偏移量,v1代表的是每一行内容 // 对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); // 迭代切割出来的单词数据 for (String word : words) { // 把单词转换为大写 Text k2 = new Text(word.toUpperCase()); // 输出k2,v2 context.write(k2, null); } } } /** * 组装job = 只有map */ public static void main(String[] args){ try{ if(args.length != 2){ //如果传递的参数不够,程序直接退出 System.exit(100); } // job需要配置的参数 // 创建一个配置对象 Configuration conf = new Configuration(); // 创建一个job Job job = Job.getInstance(conf); // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类 job.setJarByClass(WordConverJob.class); // 指定输入路径 FileInputFormat.setInputPaths(job,new Path(args[0])); // 指定输出路径 FileOutputFormat.setOutputPath(job,new Path(args[1])); // 指定map相关的代码 job.setMapperClass(MyMapper.class); // 指定k2的类型 job.setMapOutputKeyClass(Text.class); // 指定v2的类型 job.setMapOutputValueClass(LongWritable.class); // 不需要reduce阶段 job.setNumReduceTasks(0); // 提交job job.waitForCompletion(true); }catch (Exception e){ e.printStackTrace(); } } }
4,对 MapReduce 任务打 Jar 包
(1)如果 MapReduce 代码开完毕后想要执行,则需要将其打为 Jar 包。此时需要在此项目对应的 pom.xml 文件中添加 Maven 的编译打包插件配置。
<build> <plugins> <!-- compiler 插件,设定 JDK 版本 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <encoding>UTF-8</encoding> <source>1.8</source> <target>1.8</target> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
(2)然后执行打 Jar 包的操作:
(3)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。
5,准备测试数据
(1)我们登录服务器,创建一个 hello.txt
vi hello.txt
(2)文件内容如下:
hello hangge.com welcome to hangge.com hello world bye world
(3)接着执行如下命令把测试数据上传至 HDFS:
hdfs dfs -put hello.txt /
6,向集群提交 MapReduce 任务
(1)我们将前面生成的 Jar 包上传至 Hadoop 集群的任意一台机器上,或者 Hadoop 客户端机器上,并且执行如下命令向集群提交 Jar 包:
注意:MapReduce 任务中指定的输出目录(/out)必须是一个之前不存在的目录,否则任务执行时会报错。
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordConverJob /hello.txt /out
(2)在将任务提交到集群后,可以在提交任务的命令行中看到如下日志信息。如果 map 执行到 100%,reduce 执行到 100%,则说明任务执行成功了。
2024-01-22 15:12:59,887 INFO mapreduce.Job: map 0% reduce 0% 2024-01-22 15:13:08,050 INFO mapreduce.Job: map 100% reduce 0% 2024-01-22 15:13:16,261 INFO mapreduce.Job: map 100% reduce 100%
(3)我们也可以使用浏览器访问 yarn 任务监控 web 页面,地址为 http://主节点IP:8088,查看任务执行情况:
(4)待任务结束后,我们可以执行如下命令查看任务输出的结果:
hdfs dfs -ls /out
- _SUCCESS 是一个标识文件,有这个文件表示这个任务执行成功了。
- part-r-00000 是具体的结果文件,如果有多个 Reduce Task,则会产生多个结果文件。多个文件会按照顺序编号:part-r-00001、part-r-00002 等。
(5)查看结果文件内容,可以看到具体的单词拆分转换结果:
hdfs dfs -cat /out/part-r-00000
全部评论(0)