Hadoop - 小文件问题的解决方案2(MapReduce读取SequenceFile进行计算)
作者:hangge | 2024-08-15 08:32
前文我演示了如何将小文件合并成 SequenceFile 进行存储,本文我接着演示如何通过 MapReduce 读取 SequenceFile 进行计算。

(2)接着运行这个任务:
(3)执行成功以后查看结果,可以看到统计正确:
1,样例代码
(2)要让 MapReduce 可以读取 SequenceFile,只需要修改两个地方:
- 修改 map 中 k1 的数据类型为 Text 类型
- 修改 job 中的设置输入数据处理类为 SequenceFileInputFormat
public class WordCountJob {
/**
* 创建自定义mapper类
*/
public static class MyMapper extends Mapper<Text,Text,Text,LongWritable>{
/**
* 需要实现map函数
* 这个map函数就是可以接收k1,v1,产生k2,v2
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(Text k1, Text v1, Context context)
throws IOException, InterruptedException {
// k1代表的是每一行的行首偏移量,v1代表的是每一行内容
// 对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
// 迭代切割出来的单词数据
for(String word:words){
// 把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
// 输出k2,v2
context.write(k2,v2);
}
}
}
/**
* 创建自定义的reducer类
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
/**
* 针对v2s的数据进行累加求和
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//v2s {1,1,1,1}
//创建一个sum变量,保存v2s的和
long sum = 0L;
for(LongWritable v2:v2s){
sum += v2.get();
}
//组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出结果
context.write(k3,v3);
}
}
/**
* 组装job = map + reduce
*/
public static void main(String[] args){
try{
if(args.length != 2){
//如果传递的参数不够,程序直接退出
System.exit(100);
}
// job需要配置的参数
// 创建一个配置对象
Configuration conf = new Configuration();
// 创建一个job
Job job = Job.getInstance(conf);
// 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类
job.setJarByClass(WordCountJob.class);
// 指定输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 指定输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 指定map相关的代码
job.setMapperClass(MyMapper.class);
// 指定k2的类型
job.setMapOutputKeyClass(Text.class);
// 指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
//设置输入数据处理类
job.setInputFormatClass(SequenceFileInputFormat.class);
// 指定reduce相关的代码
job.setReducerClass(MyReducer.class);
// 指定k3的类型
job.setOutputKeyClass(Text.class);
// 指定v3的类型
job.setOutputValueClass(LongWritable.class);
// 提交job
job.waitForCompletion(true);
}catch (Exception e){
e.printStackTrace();
}
}
}
2,运行测试

hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob /seqFile /output
(3)执行成功以后查看结果,可以看到统计正确:
(4)此时到 yarn 的 web 界面上查看 map 任务的个数,发现只有 1 个,说明最终只读取一个 SequenceFile,代码确实生效了。
全部评论(0)