返回 导航

其他

hangge.com

Java - 并发队列使用详解1(阻塞队列:ArrayBlockingQueue、LinkedBlockingQueue等)

作者:hangge | 2020-03-29 08:10
    在并发队列上 JDK 提供了两套实现: 一个是以 ConcurrentLinkedQueue 为代表的高性能队列,一个是以 BlockingQueue 接口为代表的阻塞队列。无论哪种都继承自 Queue,并且都是线程安全的,都可以进行并发编程。 本文先介绍阻塞队列并通过样例进行演示。

一、阻塞队列介绍

1,什么是阻塞队列?

(1)阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是: 
  • 在队列为空时,获取元素的线程会等待队列变为非空。 
  • 当队列满时,存储元素的线程会等待队列可用。 
(2)阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

2,阻塞队列有哪些?

(1)ArrayBlockingQueue
  • 一个由数组结构组成的有界阻塞队列。
  • 在创建 ArrayBlockingQueue 对象时必须指定容量大小。
  • 并且创建时可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
  • 它是一个先进先出队列。

(2)LinkedBlockingQueue
  • 一个由链表结构组成的有界阻塞队列。
  • 在创建 LinkedBlockingQueue 对象时可以指定容量大小。如果不指定容量大小,则默认大小为 Integer.MAX_VALUE
  • 它是一个先进先出队列。

(3)PriorityBlockingQueue
  • 一个支持优先级排序的无界阻塞队列,即容量没有上限。
  • 它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。
  • 所有插入 PriorityBlockingQueue 的对象必须实现 java.lang.Comparable 接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。
  • PriorityBlockingQueue 中允许插入 null 对象。 

(4)DelayQueue
  • 一个使用优先级队列实现的无界阻塞队列,它是基于 PriorityQueue
  • DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。
  • DelayQueue 也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

(5)SynchronousQueue
  • 一个不存储元素的阻塞队列。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。

(6)LinkedTransferQueue
  • 一个由链表结构组成的无界阻塞队列。

(7)LinkedBlockingDeque
  • 一个由链表结构组成的双向阻塞队列。

3,阻塞队列的几个常用方法

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e)
  • 队列未满时,返回 true
  • 队列满则抛出异常
offer(e)
  • 队列未满时,返回 true
  • 队列满时返回 false。非阻塞立即返回。
put(e)
  • 队列未满时,直接插入没有返回值;
  • 队列满时会阻塞等待,一直等到队列未满时再插入。
offer(e,time,unit)
  • 设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回 false
  • 插入成功返回 true
移除方法 remove()
  • 队列不为空时,返回队首值并移除
  • 队列为空时抛出异常
poll()
  • 队列不为空时返回队首值并移除
  • 队列为空时返回 null。非阻塞立即返回。
take()
  • 队列不为空返回队首值并移除
  • 当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。
poll(time,unit)
  • 设定等待的时间,如果在指定时间内队列还未孔则返回 null
  • 不为空则返回队首值
检查方法 element() peek() 不可用 不可用

附:使用 BlockingQueue 模拟生产者与消费者

1,一直阻塞方式

(1)首先我们创建一个生产者,代码如下。我们使用 put() 方法插入元素,当队列未满时,该方法会直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。
提示put() 方法的实现原理是先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用 notFull.await() 进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。当被其他线程唤醒时,通过 enqueue(e) 方法插入元素,最后解锁。
public class ProducerThread implements Runnable {
    private String name;
    private BlockingQueue queue;
    private volatile boolean flag = true;
    private static AtomicInteger count = new AtomicInteger();

    public ProducerThread(String name, BlockingQueue queue) {
        this.name = name;
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println(now() + this.name + ":线程启动。");
            while (flag) {
                String data = count.incrementAndGet()+"";
                System.out.println(now() + this.name + ":开始存入" + data + "到队列中......");
                // 将数据存入队列中
                queue.put(data);
                System.out.println(now() + this.name + ":成功存入" + data + "到队列中。");
            }
        } catch (Exception e) {

        } finally {
            System.out.println(now() + this.name + ":退出线程。");
        }
    }

    public void stopThread() {
        this.flag = false;
    }

    // 获取当前时间(分:秒)
    public String now() {
        Calendar now = Calendar.getInstance();
        return "[" + now.get(Calendar.MINUTE) + ":" + now.get(Calendar.SECOND) + "] ";
    }
}

(2)接着创建一个消费者,代码如下。我们使用 take() 方法获取元素,当队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。同时每次获取到元素之后会等待个 1 秒钟,模拟实际业务处理耗时,也便于观察队列情况。
提示take() 方法在获取元素时加锁,生产者无法操作 queue,取到元素并移除元素,然后释放锁。
public class ConsumerThread implements Runnable {
    private String name;
    private BlockingQueue<String> queue;
    private volatile boolean flag = true;

    public ConsumerThread(String name, BlockingQueue<String> queue) {
        this.name = name;
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println(now() + this.name + ":线程启动。");
        try {
            while (flag) {
                System.out.println(now() + this.name + ":正在从队列中获取数据......");
                String data = queue.take();
                System.out.println(now() + this.name + ":拿到队列中的数据:" + data);
                //等待1秒钟(模拟实际业务处理)
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(now() + this.name + ":退出线程。");
        }
    }

    // 获取当前时间(分:秒)
    public String now() {
        Calendar now = Calendar.getInstance();
        return "[" + now.get(Calendar.MINUTE) + ":" + now.get(Calendar.SECOND) + "] ";
    }
}

(3)最后分别创建两个生产者以及一个消费者进行测试,并且 3 秒种之后通知生产者线程退出。
public class ProducerAndConsumer {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个ArrayBlockingQueue类型的阻塞队列(容量大小为3,公平性为true)
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        // 创建两个生产者和一一个消费者
        ProducerThread producerThread1 = new ProducerThread("11111生产者1", queue);
        ProducerThread producerThread2 = new ProducerThread("22222生产者2", queue);
        ConsumerThread consumerThread1 = new ConsumerThread("---->消费者1", queue);
        Thread t1 = new Thread(producerThread1);
        Thread t2 = new Thread(producerThread2);
        Thread c1 = new Thread(consumerThread1);
        t1.start();
        t2.start();
        c1.start();

        // 执行3s后,生产者不再生产
        Thread.sleep(3 * 1000);
        producerThread1.stopThread();
        producerThread2.stopThread();
    }
}

(4)运行结果如下,可以看到整个过程两个生产者前后一共生产了 8 个元素,并由消费者消费掉:

2,超时退出方式

(1)我们对生产者代码稍作改变,这次我们使用 offer() 方法插入元素,并设定等待的时间,当队列未满时,该方法会直接插入然后返回 true;队列满时会阻塞等待设定的时间,如果在指定时间内还不能往队列中插入数据则返回 false(不会一直阻塞下去)
public class ProducerThread implements Runnable {
    private String name;
    private BlockingQueue queue;
    private volatile boolean flag = true;
    private static AtomicInteger count = new AtomicInteger();

    public ProducerThread(String name, BlockingQueue queue) {
        this.name = name;
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println(now() + this.name + ":线程启动。");
            while (flag) {
                String data = count.incrementAndGet()+"";
                System.out.println(now() + this.name + ":开始存入" + data + "到队列中......");
                // 将数据存入队列中
                boolean offer = queue.offer(data, 2, TimeUnit.SECONDS);
                if (offer) {
                    System.out.println(now() + this.name + ":成功存入" + data + "到队列中。");
                } else {
                    System.out.println(now() + this.name + ":存入" + data + "失败!!!");
                }
            }
        } catch (Exception e) {

        } finally {
            System.out.println(now() + this.name + ":退出线程。");
        }
    }

    public void stopThread() {
        this.flag = false;
    }

    // 获取当前时间(分:秒)
    public String now() {
        Calendar now = Calendar.getInstance();
        return "[" + now.get(Calendar.MINUTE) + ":" + now.get(Calendar.SECOND) + "] ";
    }
}

(2)对消费者代码同样稍作改变,这次我们使用 poll() 方法获取 元素,并设定等待的时间,当队列不为空时,会返回队首值;当队列为空时阻塞等待设定的时间,如果在指定时间内队列还未空则返回 null(不会一直阻塞下去)
public class ConsumerThread implements Runnable {
    private String name;
    private BlockingQueue<String> queue;
    private volatile boolean flag = true;

    public ConsumerThread(String name, BlockingQueue<String> queue) {
        this.name = name;
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println(now() + this.name + ":线程启动。");
        try {
            while (flag) {
                System.out.println(now() + this.name + ":正在从队列中获取数据......");
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (data != null) {
                    System.out.println(now() + this.name + ":拿到队列中的数据:" + data);
                    //等待1秒钟
                    Thread.sleep(1000);
                } else {
                    System.out.println(now() + this.name + ":超过2秒未获取到数据!!!");
                    flag = false;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(now() + this.name + ":消退出线程。");
        }
    }

    // 获取当前时间(分:秒)
    public String now() {
        Calendar now = Calendar.getInstance();
        return "[" + now.get(Calendar.MINUTE) + ":" + now.get(Calendar.SECOND) + "] ";
    }
}

(3)测试代码 ProducerAndConsumer.java 内容同之前一样没有变化,运行结果如下,可以看到整个过程两个生产者前后一共生产了 8 个元素(但其中有 2 个元素因为超时没有进入队列),消费者最终一个消费了 6 个元素:
评论

全部评论(0)

回到顶部