Java - 并发队列使用详解2(非阻塞队列:ConcurrentLinkedQueue)
作者:hangge | 2020-03-30 08:10
在并发队列上 JDK 提供了两套实现: 一个是以 ConcurrentLinkedQueue 为代表的高性能队列,一个是以 BlockingQueue 接口为代表的阻塞队列。无论哪种都继承自 Queue,并且都是线程安全的,都可以进行并发编程。 我在前文中通过样例介绍了阻塞队列(点击查看),接下来介绍非阻塞队列。
(2)接着创建一个消费者,代码如下。我们使用 poll() 方法获取元素,同时每次获取到元素之后会等待个 1 秒钟,模拟实际业务处理耗时,也便于观察队列情况。
(3)最后分别创建两个生产者以及一个消费者进行测试,并且 3 秒种之后通知生产者线程退出。
(4)运行结果如下,可以看到整个过程两个生产者前后一共生产了 6 个元素,并由消费者消费掉:
二、非阻塞队列
1,ConcurrentLinkedQueue 介绍
- ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列。
- 该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,也就是插入的时候往尾部插,取出元素从头部取。
- 该队列不允许 null 元素。
- ConcurrentLinkedQueue 通过无锁的方式,即内部是遵循 CAS(比较并交换)的方式来实现,实现了高并发状态下的高性能,因此通常 ConcurrentLinkedQueue 的性能好于 BlockingQueue(ArrayBlockingQueue 一把锁,LinkedBlockingQueue 存取两把锁)
2,ConcurrentLinkedQueue 重要方法
(1)插入、获取元素
- add() 和 offer():都是加入元素的方法(在 ConcurrentLinkedQueue 中这俩个方法没有任何区别,add 方法内部调用的就是 offer 方法)
- poll() 和 peek():都是取头元素节点,区别在于前者会删除元素,后者不会。
(2)判断是否为空、获取个数
- size() 方法返回队列元素个数,由于该方法需要遍历一遍集合的,效率很慢,所以尽量要避免用 size。此外,如果在执行期间添加或删除元素。对于此方法,返回的结果可能不准确。
- isEmpty() 方法可以判断队列是否为空,由于 size() 比较慢,所以判断队列是否为空最好用 isEmpty() 而不是用 size 来判断。
附:使用 ConcurrentLinkedQueue 模拟生产者与消费者
(1)首先我们创建一个生产者,代码如下。我们使用 offer() 方法插入元素,同时每次插入元素之后会等待个 1 秒钟,便于观察队列运行情况:public class ProducerThread implements Runnable {
private String name;
private ConcurrentLinkedQueue queue;
private volatile boolean flag = true;
private static AtomicInteger count = new AtomicInteger();
public ProducerThread(String name, ConcurrentLinkedQueue queue) {
this.name = name;
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(now() + this.name + ":线程启动。");
while (flag) {
String data = count.incrementAndGet()+"";
// 将数据存入队列中
queue.offer(data);
System.out.println(now() + this.name + ":存入" + data + "到队列中。");
//等待1秒钟
Thread.sleep(1000);
}
} catch (Exception e) {
} finally {
System.out.println(now() + this.name + ":退出线程。");
}
}
public void stopThread() {
this.flag = false;
}
// 获取当前时间(分:秒)
public String now() {
Calendar now = Calendar.getInstance();
return "[" + now.get(Calendar.MINUTE) + ":" + now.get(Calendar.SECOND) + "] ";
}
}
(2)接着创建一个消费者,代码如下。我们使用 poll() 方法获取元素,同时每次获取到元素之后会等待个 1 秒钟,模拟实际业务处理耗时,也便于观察队列情况。
public class ConsumerThread implements Runnable {
private String name;
private ConcurrentLinkedQueue<String> queue;
private volatile boolean flag = true;
public ConsumerThread(String name, ConcurrentLinkedQueue queue) {
this.name = name;
this.queue = queue;
}
@Override
public void run() {
System.out.println(now() + this.name + ":线程启动。");
try {
while (flag) {
System.out.println(now() + this.name + ":正在从队列中获取数据......");
String data = queue.poll();
System.out.println(now() + this.name + ":拿到队列中的数据:" + data);
//等待1秒钟
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(now() + this.name + ":消退出线程。");
}
}
// 获取当前时间(分:秒)
public String now() {
Calendar now = Calendar.getInstance();
return "[" + now.get(Calendar.MINUTE) + ":" + now.get(Calendar.SECOND) + "] ";
}
}
(3)最后分别创建两个生产者以及一个消费者进行测试,并且 3 秒种之后通知生产者线程退出。
public class ProducerAndConsumer {
public static void main(String[] args) throws InterruptedException {
// 创建一个ConcurrentLinkedQueue类型的非阻塞队列
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// 创建两个生产者和一一个消费者
ProducerThread producerThread1 = new ProducerThread("11111生产者1", queue);
ProducerThread producerThread2 = new ProducerThread("22222生产者2", queue);
ConsumerThread consumerThread1 = new ConsumerThread("---->消费者1", queue);
Thread t1 = new Thread(producerThread1);
Thread t2 = new Thread(producerThread2);
Thread c1 = new Thread(consumerThread1);
t1.start();
t2.start();
c1.start();
// 执行3s后,生产者不再生产
Thread.sleep(3 * 1000);
producerThread1.stopThread();
producerThread2.stopThread();
}
}
(4)运行结果如下,可以看到整个过程两个生产者前后一共生产了 6 个元素,并由消费者消费掉:
全部评论(0)