YARN - 多资源队列的配置和使用教程(附wordcount任务提交样例)
作者:hangge | 2024-08-28 08:34
一、YARN 中的调度器说明
1,为什么需要调度器?
(1)我们集群的资源是有限的,在实际工作中会有很多人向集群中提交任务,那么这个时候调度器就会决定资源如何分配。
(2)又比如我们提交了一个很占资源的任务,如果这一个任务就把集群中 90% 的资源都占用了,那么后面别人再提交任务,剩下的资源就不够用了。这时候新的任务是等前面的任务执行完了再执行?还是说把我们的资源匀出来一些分给它,我们少占用一些,让它也能慢慢的开始执行?这个也是由调度器负责的。
2,YARN 中的三种调度器
(1)FIFO Scheduler:先进先出(first in,first out)调度策略
- 先进先出策略大家都是排队的,如果你的任务申请不到足够的资源,那就等着,等前面的任务执行结束释放了资源之后你再执行。
- 这种策略在有些时候是不合理的,因为我们有一些任务的优先级比较高,我们希望任务提交上去立刻就开始执行,这个就实现不了了。
(2)Capacity Scheduler:FIFO Scheduler 的多队列版本
- 它是 FIFO Scheduler 的多队列版本,就是我们先把集群中的整块资源划分成多份,我们可以人为的给这些资源定义使用场景,例如下图里的 queue A 运行普通的任务,queue B 中运行优先级比较高的任务。这两个队列的资源是相互独立的。
- 但是注意一点,队列内部还是按照先进先出的规则。
(3)FairScheduler:多队列,多用户共享资源
- 支持多个队列,每个队列可以配置一定的资源,每个队列中的任务共享其所在队列的所有资源,不需要排队等待资源。
- 具体是这样的,假设我们向一个队列中提交了一个任务,这个任务刚开始会占用整个队列的资源,当你再提交第二个任务的时候,第一个任务会把他的资源释放出来一部分给第二个任务使用。

3,查看调度器类型
(1)在实际工作中我们一般都是使用第二种,即 CapacityScheduler。并且从 hadoop2 开始,CapacityScheduler 也是集群中的默认调度器。
(2)我们到集群上看一下,点击左侧的 Scheduler 查看:
- Capacity,这个是集群的调度器类型。
- 下面的 root 是根的意思,他下面目前只有一个队列,叫 default,我们之前提交的任务都会进入到这个队列中。

二、YARN 多资源队列配置和使用
1,需求说明
假设为了满足企业合理利用大数据集群中的资源,需要将离线任务和实时任务进行隔离。因此,需要在 YARN 中将资源划分为以下 2 个队列。
- Offline 队列:在此队列中运行离线任务。
- Online 队列:在此队列中运行实时任务。
提示:在实际工作中,一般我们先将资源划分成 offline 队列和 online 队列,随着集群规模的扩大和业务需求的增加,又增加了多个队列,再对集群资源做更细致的划分。
2,修改 YARN 集群配置文件
(1)我们需要修改集群中的 capacity-scheduler.xml 配置文件,该文件在 Hadoop 集群安装目录的“etc/Hadoop”目录下:
cd /usr/local/hadoop/etc/hadoop vi capacity-scheduler.xml
(2)配置文件中修改或添加相关内容:
- 新增 online、offline 队列
- 修改原有 default 队列资源设置
- 新增 online、offline 队列资源设置
提示:这里的 default 队列是默认队列,必须保留。额外增加了 ofline 队列和 online 队列。这 3 个队列的资源比例为 7:1:2。
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,online,offline</value>
<description>
The queues at the this level (root is the root queue).
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>70</value>
<description>defualt 队列 10%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.capacity</name>
<value>10</value>
<description>online 队列 10%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.capacity</name>
<value>20</value>
<description>offline 队列 20%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>70</value>
<description>defualt 队列可使用的资源上限</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.maximum-capacity</name>
<value>10</value>
<description>online 队列可使用的资源上限</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.maximum-capacity</name>
<value>20</value>
<description>offline 队列可使用的资源上限</description>
</property>
(3)然后,把修改好的配置文件同步到另外两个节点上:
scp -rq capacity-scheduler.xml node2:/usr/local/hadoop/etc/hadoop/ scp -rq capacity-scheduler.xml node3:/usr/local/hadoop/etc/hadoop/
(4)修改配置之后,需要重启集群才能生效。
sbin/stop-all.sh sbin/start-all.sh
3,验证效果
进入 YARN 的 Web 界面,查看最新的队列信息。可以发现除了默认的 default 队列,又增加了 offline 和 online 这两个队列。

附:向指定队列提交任务
1,创建任务 Jar 包
(1)这里我们要将使用 MapReduce 开发的 WordCount 代码提交到 offline 队列中,具体的代码和编译打包步骤可以参考我之前写的文章:
(2)由于在对 MapReduce 程序手工指定资源队列时,需要修改已有代码。下面高亮部分是修改的内容:
注意:对于 Spark、Flink 程序,不需要修改已有代码,直接通过对应的参数指定队列名称即可。
public class WordCountJob {
/**
* 创建自定义mapper类
*/
public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
/**
* 需要实现map函数
* 这个map函数就是可以接收k1,v1,产生k2,v2
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// k1代表的是每一行的行首偏移量,v1代表的是每一行内容
// 对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
// 迭代切割出来的单词数据
for(String word:words){
// 把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
// 输出k2,v2
context.write(k2,v2);
}
}
}
/**
* 创建自定义的reducer类
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
/**
* 针对v2s的数据进行累加求和
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//v2s {1,1,1,1}
//创建一个sum变量,保存v2s的和
long sum = 0L;
for(LongWritable v2:v2s){
sum += v2.get();
}
//组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出结果
context.write(k3,v3);
}
}
/**
* 组装job = map + reduce
*/
public static void main(String[] args){
try{
// if(args.length != 2){
// //如果传递的参数不够,程序直接退出
// System.exit(100);
// }
// job需要配置的参数
// 创建一个配置对象
Configuration conf = new Configuration();
//解析命令行中-D后面传递过来的参数,添加到conf中
String[] remainingArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
// 创建一个job
Job job = Job.getInstance(conf);
// 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类
job.setJarByClass(WordCountJob.class);
// 指定输入路径
FileInputFormat.setInputPaths(job,new Path(remainingArgs[0]));
// 指定输出路径
FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));
// 指定map相关的代码
job.setMapperClass(MyMapper.class);
// 指定k2的类型
job.setMapOutputKeyClass(Text.class);
// 指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
// 指定reduce相关的代码
job.setReducerClass(MyReducer.class);
// 指定k3的类型
job.setOutputKeyClass(Text.class);
// 指定v3的类型
job.setOutputValueClass(LongWritable.class);
// 提交job
job.waitForCompletion(true);
}catch (Exception e){
e.printStackTrace();
}
}
}
2,提交任务
(1)将打包后的 jar 包上传到服务器,执行命令向集群中提交任务。注意我这里通过 mapreduce.job.queuename 参数指定了使用的队列:
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordCountJob -Dmapreduce.job.queuename=offline /hello.txt /out
(2)到 YARN 中查看任务所在的队列,可以看到确实在我们指定的 offline 队列上面执行:

全部评论(0)