SpringBoot - 并发框架Disruptor使用详解2(多生产者、多消费者、消费者依赖关系)
作者:hangge | 2020-04-08 08:10
三、多生产者、多消费者
1,多生产者
(1)下面测试样例在前文的基础上在增加一个生产者,即两个生产者、一个消费者。
注意:多生产者的时候要将 isMoreProducer 参数设置为 true,即 ProducerType 使用 ProducerType.MULTI。否则会出现数据丢失的情况。
public class DisruptorTest {
public static void main(String[] args) throws InterruptedException {
// 创建一个消费者
MyConsumer myConsumer = new MyConsumer("---->消费者1");
// 创建一个Disruptor队列操作类对象(RingBuffer大小为4,true表示有多个生产者)
DisruptorQueue disruptorQueue = DisruptorQueueFactory.getHandleEventsQueue(4,
true, myConsumer);
// 创建两个生产者,开始模拟生产数据
MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue);
Thread t1 = new Thread(myProducerThread1);
t1.start();
MyProducerThread myProducerThread2 = new MyProducerThread("22222生产者2", disruptorQueue);
Thread t2 = new Thread(myProducerThread2);
t2.start();
// 执行3s后,生产者不再生产
Thread.sleep(3 * 1000);
myProducerThread1.stopThread();
myProducerThread2.stopThread();
}
}
(2)运行结果如下,可以看到整个过程两个生产者前后一共生产了 7 个元素,并由消费者消费掉:

2,多消费者
(1)下面代码在上面的基础上增加一个消费者(即一共两个消费者),同时通过 DisruptorQueueFactory.getHandleEventsQueue 创建“发布订阅”模式的操作队列,可以看到同一事件会被多个消费者并行消费:
public class DisruptorTest {
public static void main(String[] args) throws InterruptedException {
// 创建两个消费者
MyConsumer myConsumer1 = new MyConsumer("---->消费者1");
MyConsumer myConsumer2 = new MyConsumer("---->消费者2");
// 创建一个Disruptor队列操作类对象(RingBuffer大小为4,true表示有多个生产者)
DisruptorQueue disruptorQueue = DisruptorQueueFactory.getHandleEventsQueue(4,
true, myConsumer1, myConsumer2);
// 创建两个生产者,开始模拟生产数据
MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue);
Thread t1 = new Thread(myProducerThread1);
t1.start();
MyProducerThread myProducerThread2 = new MyProducerThread("22222生产者2", disruptorQueue);
Thread t2 = new Thread(myProducerThread2);
t2.start();
// 执行3s后,生产者不再生产
Thread.sleep(3 * 1000);
myProducerThread1.stopThread();
myProducerThread2.stopThread();
}
}
(2)我们也可以通过 DisruptorQueueFactory.getWorkPoolQueue 方法创建“点对点”模式的操作队列,这样同一事件只会被一组消费者其中之一消费:
public class DisruptorTest {
public static void main(String[] args) throws InterruptedException {
// 创建两个消费者
MyConsumer myConsumer1 = new MyConsumer("---->消费者1");
MyConsumer myConsumer2 = new MyConsumer("---->消费者2");
// 创建一个Disruptor队列操作类对象(RingBuffer大小为4,true表示有多个生产者)
DisruptorQueue disruptorQueue = DisruptorQueueFactory.getWorkPoolQueue(4,
true, myConsumer1, myConsumer2);
// 创建两个生产者,开始模拟生产数据
MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue);
Thread t1 = new Thread(myProducerThread1);
t1.start();
MyProducerThread myProducerThread2 = new MyProducerThread("22222生产者2", disruptorQueue);
Thread t2 = new Thread(myProducerThread2);
t2.start();
// 执行3s后,生产者不再生产
Thread.sleep(3 * 1000);
myProducerThread1.stopThread();
myProducerThread2.stopThread();
}
}
四、消费者依赖关系
1,简单的依赖
(1)下面是一个简单的消费者依赖关系,消费者 C3 消费时,必须保证消费者 C1 和消费者 C2 已经完成对该消息的消费。举个例子,在处理实际的业务逻辑(C3)之前,需要校验数据(C1),以及将数据写入磁盘(C2)。
(2)下面是样例代码:
public class DisruptorTest {
public static void main(String[] args) throws InterruptedException {
// 创建两个消费者
MyConsumer myConsumer1 = new MyConsumer("---->消费者C1");
MyConsumer myConsumer2 = new MyConsumer("---->消费者C2");
MyConsumer myConsumer3 = new MyConsumer("------->消费者C3");
// 创建一个Disruptor对象
Disruptor disruptor = new Disruptor(new ObjectEventFactory(),
4, Executors.defaultThreadFactory(), ProducerType.SINGLE,
new SleepingWaitStrategy());
// 设置消费者依赖关系(先让C1和C2消费,再让C3消费)
disruptor.handleEventsWith(myConsumer1, myConsumer2).then(myConsumer3);
// 创建一个Disruptor队列操作类对象
DisruptorQueue disruptorQueue = DisruptorQueueFactory.getQueue(disruptor);
// 创建一个生产者,开始模拟生产数据
MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue);
Thread t1 = new Thread(myProducerThread1);
t1.start();
// 执行3s后,生产者不再生产
Thread.sleep(3 * 1000);
myProducerThread1.stopThread();
}
}
2,复杂的消费者依赖关系
(1)下面是一个更加复杂的消费者依赖关系:- 消费者 C1B 消费时,必须保证消费者 C1A 已经完成对该消息的消费;
- 消费者 C2B 消费时,必须保证消费者 C2A 已经完成对该消息的消费;
- 消费者 C3 消费时,必须保证消费者 C1B 和 C2B 已经完成对该消息的消费。

public class DisruptorTest {
public static void main(String[] args) throws InterruptedException {
// 创建两个消费者
MyConsumer myConsumerC1A = new MyConsumer("---->消费者C1A");
MyConsumer myConsumerC1B = new MyConsumer("------->消费者C1B");
MyConsumer myConsumerC2A = new MyConsumer("---->消费者C2A");
MyConsumer myConsumerC2B = new MyConsumer("------->消费者C2B");
MyConsumer myConsumerC3 = new MyConsumer("----------->消费者C3");
// 创建一个Disruptor对象
Disruptor disruptor = new Disruptor(new ObjectEventFactory(),
4, Executors.defaultThreadFactory(), ProducerType.SINGLE,
new SleepingWaitStrategy());
// 设置消费者依赖关系
disruptor.handleEventsWith(myConsumerC1A, myConsumerC2A);
disruptor.after(myConsumerC1A).then(myConsumerC1B);
disruptor.after(myConsumerC2A).then(myConsumerC2B);
disruptor.after(myConsumerC1B, myConsumerC2B).then(myConsumerC3);
// 创建一个Disruptor队列操作类对象
DisruptorQueue disruptorQueue = DisruptorQueueFactory.getQueue(disruptor);
// 创建一个生产者,开始模拟生产数据
MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue);
Thread t1 = new Thread(myProducerThread1);
t1.start();
// 执行3s后,生产者不再生产
Thread.sleep(3 * 1000);
myProducerThread1.stopThread();
}
}
全部评论(0)