Disruptor-高性能队列

持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第4天,点击查看活动详情

笔者在之前的文章中介绍过 RingBuffer 的一些理论知识和自己动手实现了一个简单的 RingBuffer 。而 Disruptor 就实现了一个高性能的 RingBuffer。下面对这个项目进行分析和一些简单的使用。用来解决笔者写的一个分布式ID生成器 rain 中雪花算法利用RingBuffer的缓存问题免得自己造轮子。

1. 背景

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

目前包括有Apache Storm、Camel、Log4j2在内的多个知名的项目使用了Disruptor,这也同时说明了Disruptor的高可用和高质量。

2. Disruptor介绍

Disruptor 是一个提供并发环形缓冲区数据结构的库。它旨在在异步事件处理架构中提供低延迟、高吞吐量的工作队列。本质上还是一个队列

2.1 核心概念

  • Ring Buffer:Ring Buffer 通常被认为是 Disruptor 的主要方面。但是,从 3.0 开始,Ring Buffer 只负责存储和更新Event通过 Disruptor 的数据。对于一些高级用例,它甚至可以完全由用户替换。
  • Sequence:Disruptor 使用Sequences 作为识别特定组件在哪里的一种手段。每个消费者(事件处理器)Sequence都像 Disruptor 本身一样维护一个。大多数并发代码依赖于这些序列值的移动,因此Sequence支持许多当前的特性AtomicLong。事实上,两者之间唯一真正的区别是Sequence包含额外的功能来防止Sequences 和其他值之间的错误共享。
  • Sequencer:Sequencer 是 Disruptor 的真正核心。该接口的两种实现(单生产者、多生产者)实现了所有并发算法,以在生产者和消费者之间快速、正确地传递数据。
  • Sequence Barrier:Sequencer 生成一个 Sequence Barrier,其中包含对Sequence从 Sequencer 发布的 main 和Sequence任何依赖消费者的 s 的引用。它包含确定是否有任何事件可供消费者处理的逻辑。
  • Wait Strategy:等待策略决定了消费者将如何等待生产者将事件放入 Disruptor。有关可选无锁的部分提供了更多详细信息。
  • Event:从生产者传递给消费者的数据单位。事件没有特定的代码表示,因为它完全由用户定义。
  • Event Processor:用于处理来自 Disruptor 的事件的主事件循环,并拥有消费者序列的所有权。有一个称为 BatchEventProcessor 的表示,它包含事件循环的有效实现,并将回调到 EventHandler 接口的使用提供的实现。
  • Event Handler:由用户实现的接口,代表 Disruptor 的消费者。
  • Producer:这是调用 Disruptor 入队Event的用户代码。这个概念在代码中也没有表示。

示意图如下:

image.png

3. 示例

maven依赖引入:

  1. <dependency>
  2. <groupId>com.lmax</groupId>
  3. <artifactId>disruptor</artifactId>
  4. <version>4.0.0.RC1</version>
  5. </dependency>

3.1 消费和生产

  1. /**
  2. * @author mxsm
  3. * @date 2022/5/28 10:58
  4. * @Since 1.0.0
  5. */
  6. public class UidEvent {
  7. private long value;
  8. public void set(long value) {
  9. this.value = value;
  10. }
  11. }

然后构造一个EventFactory:

  1. /**
  2. * @author mxsm
  3. * @date 2022/5/28 11:00
  4. * @Since 1.0.0
  5. */
  6. public class UidEventFactory implements EventFactory<UidEvent> {
  7. @Override
  8. public UidEvent newInstance() {
  9. return new UidEvent();
  10. }
  11. }

定义一个消息处理器:

  1. /**
  2. * @author mxsm
  3. * @date 2022/5/28 14:55
  4. * @Since 1.0.0
  5. */
  6. public class UidEventHandler implements EventHandler<UidEvent> {
  7. @Override
  8. public void onEvent(UidEvent event, long sequence, boolean endOfBatch) {
  9. System.out.println(event.getValue());
  10. }
  11. }

编写一个启动类:

  1. public class UidEventMain {
  2. public static void main(String[] args) throws Exception
  3. {
  4. int bufferSize = 1024;
  5. Disruptor<UidEvent> disruptor =
  6. new Disruptor<>(UidEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
  7. disruptor.handleEventsWith(new UidEventHandler());
  8. disruptor.start();
  9. RingBuffer<UidEvent> ringBuffer = disruptor.getRingBuffer();
  10. ByteBuffer bb = ByteBuffer.allocate(8);
  11. for (long l = 0; ; l++)
  12. {
  13. bb.putLong(0, l);
  14. ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
  15. Thread.sleep(1000);
  16. }
  17. }
  18. }

运行结果:

disruptor-test.gif

3. disruptor性能测试

多生产者和单生产者的性能对比(图片来自官网):

image.png

在官网还对比了java.util.concurrent.ArrayBlockingQueueDisruptor 的性能。 生产者和消费模式如下图(图来自官网):

image.png

两者对比图:

image.png

从上图可以看出来Disruptor 的性能比 ABQ 高出一个数量级。

延迟表现:

image.png

从官网和使用了Disruptor 的项目都提供了很大的性能提升。

参考文档:


文章标签:

原文连接:https://juejin.cn/post/7102686333264461832

相关推荐

Flask框架——消息闪现

34个图片压缩工具集合,包含在线压缩和CLI工具

入门即享受!coolbpf 硬核提升 BPF 开发效率 | 龙蜥技术

基于 OPLG 从 0 到 1 构建统一可观测平台实践

全链路灰度在数据库上我们是怎么做的?

冴羽答读者问:过程比结果重要吗?如果是,怎么理解?如果不是,又怎么解?

接口文档管理工具,选yapi 还是 Apifox? 这里列出了两款软件的深度分析,看完再下载不迟。

基于 Docker 来部署 Vue 或 React 前端项目及 Node 后端服务

三十岁的我,自由了!

如何实现带timeout的input?

统计千行代码Bug率,有没有意义?

814. 二叉树剪枝 : 简单递归运用题

【综合笔试题】难度 3.5\u002F5,多解法热门二叉树笔试题

为什么设计的软件不好用?那是因为不熟悉软件开发模型!一文熟悉软件开发模型

作为前端,我是这样从零实现CI\u002FCD二(node服务部署及前后端联调)

极智开发 | 讲解 Nginx 特性之一:反向代理

Netty 案例之 IM 方案设计

从 Google 离职,前Go 语言负责人跳槽小公司

最终一致性性分布式事务 TCC

不谈源码,聊聊位运算的实际应用