Hadoop - MapReduce任务程序开发教程1(WordCount功能实现样例)
作者:hangge | 2024-07-11 08:59
一、WordCount 功能实现样例
1,需求说明
(1)假设我们需要使用 MapReduce 实现一个 WordCount 功能,即读取 HDFS 上的 hello.txt 文件,计算文件中每个单词出现的总次数。其中 hello.txt 文件内部分内容如下:
hello hangge.com welcome to hangge.com hello world bye world
(2)MapReduce 代码开发流程如下:
- 开发 Map 阶段代码。
- 开发 Reduce 阶段代码。
- 组装 MapReduce 任务。
- 对 MapReduce 任务打 Jar 包。
- 向集群提交 MapReduce 任务。
2,添加 Hadoop 相关的依赖
首先我们创建一个 Maven 项目,然后在项目的 pom.xml 文件中添加 Hadoop 相关的依赖:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency>
注意:此处需要在 hadoop-client 依赖中增加 scope 属性,值为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。因为 hadoop-client 依赖在 Hadoop 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。
3,编写代码
这里我们编写一个MapReduce 任务类 WordCountJob,其内部包含如下几个部分:
- 首先自定义一个 MyMapper 类,它继承 MapReduce 框架中的 Mapper 抽象类,这里面是 Map 阶段的代码。
- 接着自定义一个 MyReducer 类,它继承 MapReduce 框架中的 Reducer 抽象类,这里面是 Reduce 阶段的代码。
- 最后将 MyMapper 类和 MyReducer 类组装起来,构建 MapReduce 任务。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountJob { /** * 创建自定义mapper类 */ public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ /** * 需要实现map函数 * 这个map函数就是可以接收k1,v1,产生k2,v2 * @param k1 * @param v1 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // k1代表的是每一行的行首偏移量,v1代表的是每一行内容 // 对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); // 迭代切割出来的单词数据 for(String word:words){ // 把迭代出来的单词封装成<k2,v2>的形式 Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); // 输出k2,v2 context.write(k2,v2); } } } /** * 创建自定义的reducer类 */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> { /** * 针对v2s的数据进行累加求和 * @param k2 * @param v2s * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //v2s {1,1,1,1} //创建一个sum变量,保存v2s的和 long sum = 0L; for(LongWritable v2:v2s){ sum += v2.get(); } //组装k3,v3 Text k3 = k2; LongWritable v3 = new LongWritable(sum); //输出结果 context.write(k3,v3); } } /** * 组装job = map + reduce */ public static void main(String[] args){ try{ if(args.length != 2){ //如果传递的参数不够,程序直接退出 System.exit(100); } // job需要配置的参数 // 创建一个配置对象 Configuration conf = new Configuration(); // 创建一个job Job job = Job.getInstance(conf); // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类 job.setJarByClass(WordCountJob.class); // 指定输入路径 FileInputFormat.setInputPaths(job,new Path(args[0])); // 指定输出路径 FileOutputFormat.setOutputPath(job,new Path(args[1])); // 指定map相关的代码 job.setMapperClass(MyMapper.class); // 指定k2的类型 job.setMapOutputKeyClass(Text.class); // 指定v2的类型 job.setMapOutputValueClass(LongWritable.class); // 指定reduce相关的代码 job.setReducerClass(MyReducer.class); // 指定k3的类型 job.setOutputKeyClass(Text.class); // 指定v3的类型 job.setOutputValueClass(LongWritable.class); // 提交job job.waitForCompletion(true); }catch (Exception e){ e.printStackTrace(); } } }
4,对 MapReduce 任务打 Jar 包
(1)如果 MapReduce 代码开完毕后想要执行,则需要将其打为 Jar 包。此时需要在此项目对应的 pom.xml 文件中添加 Maven 的编译打包插件配置。
<build> <plugins> <!-- compiler 插件,设定 JDK 版本 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <encoding>UTF-8</encoding> <source>1.8</source> <target>1.8</target> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
(2)然后执行打 Jar 包的操作:
(3)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。
5,准备测试数据
(1)我们登录服务器,创建一个 hello.txt
vi hello.txt
(2)文件内容如下:
hello hangge.com welcome to hangge.com hello world bye world
(3)接着执行如下命令把测试数据上传至 HDFS:
hdfs dfs -put hello.txt /
6,向集群提交 MapReduce 任务
(1)我们将前面生成的 Jar 包上传至 Hadoop 集群的任意一台机器上,或者 Hadoop 客户端机器上,并且执行如下命令向集群提交 Jar 包:
注意:MapReduce 任务中指定的输出目录(/out)必须是一个之前不存在的目录,否则任务执行时会报错。
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordCountJob /hello.txt /out
(2)在将任务提交到集群后,可以在提交任务的命令行中看到如下日志信息。如果 map 执行到 100%,reduce 执行到 100%,则说明任务执行成功了。
2024-01-22 15:12:59,887 INFO mapreduce.Job: map 0% reduce 0% 2024-01-22 15:13:08,050 INFO mapreduce.Job: map 100% reduce 0% 2024-01-22 15:13:16,261 INFO mapreduce.Job: map 100% reduce 100%
(3)我们也可以使用浏览器访问 yarn 任务监控 web 页面,地址为 http://主节点IP:8088,查看任务执行情况:
(4)待任务结束后,我们可以执行如下命令查看任务输出的结果:
hdfs dfs -ls /out
- _SUCCESS 是一个标识文件,有这个文件表示这个任务执行成功了。
- part-r-00000 是具体的结果文件,如果有多个 Reduce Task,则会产生多个结果文件。多个文件会按照顺序编号:part-r-00001、part-r-00002 等。
(5)查看结果文件内容,可以看到具体的单词统计结果:
hdfs dfs -cat /out/part-r-00000
附:指定目录作为输入
1,将目录作为输入路径
(1)假设我们在 HDFS 的 /input 目录下有两个文件 hello1.txt,hello2.txt:
(2)如果我们希望 Hadoop 会读取该目录下的所有文件进行计算,只需要运行时将该目录路径作为第一个输入参数即可:
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordCountJob /input /output
(3)查看日志可以发现一共产生了 2 个 map 任务,说明两个文件都读取到了:
(4)查看结果数据,可以发现统计结果也是从两个文件中计算汇总而来:
2,递归遍历子目录
(1)Hadoop 默认情况下不会递归遍历子目录,如果我们的 /input 目录下还有个 2024 子目录,该子目录下有个 hello3.txt:
(2)当我们运行任务会发现,不仅 hello3.txt 这个文件没有读取到,并且整个任务会失败。
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordCountJob /input /output
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountJob { /** * 创建自定义mapper类 */ public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ /** * 需要实现map函数 * 这个map函数就是可以接收k1,v1,产生k2,v2 * @param k1 * @param v1 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // k1代表的是每一行的行首偏移量,v1代表的是每一行内容 // 对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); // 迭代切割出来的单词数据 for(String word:words){ // 把迭代出来的单词封装成<k2,v2>的形式 Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); // 输出k2,v2 context.write(k2,v2); } } } /** * 创建自定义的reducer类 */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> { /** * 针对v2s的数据进行累加求和 * @param k2 * @param v2s * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //v2s {1,1,1,1} //创建一个sum变量,保存v2s的和 long sum = 0L; for(LongWritable v2:v2s){ sum += v2.get(); } //组装k3,v3 Text k3 = k2; LongWritable v3 = new LongWritable(sum); //输出结果 context.write(k3,v3); } } /** * 组装job = map + reduce */ public static void main(String[] args){ try{ if(args.length != 2){ //如果传递的参数不够,程序直接退出 System.exit(100); } // job需要配置的参数 // 创建一个配置对象 Configuration conf = new Configuration(); // 创建一个job Job job = Job.getInstance(conf); // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类 job.setJarByClass(WordCountJob.class); // 指定输入路径 FileInputFormat.setInputPaths(job,new Path(args[0])); // 启用递归目录遍历 FileInputFormat.setInputDirRecursive(job, true); // 指定输出路径 FileOutputFormat.setOutputPath(job,new Path(args[1])); // 指定map相关的代码 job.setMapperClass(MyMapper.class); // 指定k2的类型 job.setMapOutputKeyClass(Text.class); // 指定v2的类型 job.setMapOutputValueClass(LongWritable.class); // 指定reduce相关的代码 job.setReducerClass(MyReducer.class); // 指定k3的类型 job.setOutputKeyClass(Text.class); // 指定v3的类型 job.setOutputValueClass(LongWritable.class); // 提交job job.waitForCompletion(true); }catch (Exception e){ e.printStackTrace(); } } }
全部评论(0)