本文简单分析下Disruptor的原理
简介
Disruptor是一个高性能队列,它是系统内部的内存队列,而不是Kafka这样的分布式队列。
由于Java内置的队列,会出现加锁和伪共享等影响性能的问题,所以公司项目里使用了Disruptor框架。
Disruptor采用生产者-消费者模式,并使用环形数组结构,无锁设计,拥有很高的性能。
环形数组队列
先介绍下 Disruptor 的环形数组 RingBuffer
public static final long INITIAL_CURSOR_VALUE = -1L;
private final int indexMask;
private final Object[] entries;
private final int bufferSize;
private final Sequencer sequencer;
这是 RingBuffer 类中的变量,比较重要的变量:
- entries 环形数组 生产者生产的类就放在这个数组中
- bufferSize 环形数组的大小
- sequencer 序列号 用于事件发布者和事件处理者在ringbuffer上相互追逐,标记它们的相对位置
next()
public long next() {
return this.sequencer.next();
}
返回下一个可用的序列号
get()
public E get(long sequence) {
return this.entries[(int)sequence & this.indexMask];
}
返回生产者生产的消息,消息对象里面的内容是空的,需要指定值
publish()
public void publish(long sequence) {
this.sequencer.publish(sequence);
}
发布消费者可用序列,只有发布了,消费者才能看见。
序列
Sequence 是 Disruptor 中的序列类,主要用于生成序列号
在 Sequence 类中,可以看到避免伪共享的相关代码,主要就是 long 类型的,使用了长度为16的 long 类型的数组进行填充,这样可以有效的避免伪共享。
也采用了CAS相关操作,可以提高性能。
public class Sequence {
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long VALUE_OFFSET;
private final long[] paddedValue;
public Sequence() {
this(-1L);
}
public Sequence(long initialValue) {
this.paddedValue = new long[15];
UNSAFE.putOrderedLong(this.paddedValue, VALUE_OFFSET, initialValue);
}
public long get() {
return UNSAFE.getLongVolatile(this.paddedValue, VALUE_OFFSET);
}
public void set(long value) {
UNSAFE.putOrderedLong(this.paddedValue, VALUE_OFFSET, value);
}
public void setVolatile(long value) {
UNSAFE.putLongVolatile(this.paddedValue, VALUE_OFFSET, value);
}
public boolean compareAndSet(long expectedValue, long newValue) {
return UNSAFE.compareAndSwapLong(this.paddedValue, VALUE_OFFSET, expectedValue, newValue);
}
public long incrementAndGet() {
return this.addAndGet(1L);
}
public long addAndGet(long increment) {
long currentValue;
long newValue;
do {
currentValue = this.get();
newValue = currentValue + increment;
} while(!this.compareAndSet(currentValue, newValue));
return newValue;
}
public String toString() {
return Long.toString(this.get());
}
static {
int base = UNSAFE.arrayBaseOffset(long[].class);
int scale = UNSAFE.arrayIndexScale(long[].class);
VALUE_OFFSET = (long)(base + scale * 7);
}
}
Sequencer 接口,它的很多功能是提供给事件发布者使用的。SequenceBarrier 是给事件处理者使用的。
next()
采用自旋CAS的方式,获取下一个序列。
public long next(int n) {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
} else {
long current;
long next;
do {
while(true) {
current = this.cursor.get();
next = current + (long)n;
long wrapPoint = next - (long)this.bufferSize;
long cachedGatingSequence = this.gatingSequenceCache.get();
if (wrapPoint <= cachedGatingSequence && cachedGatingSequence <= current) {
break;
}
long gatingSequence = Util.getMinimumSequence(this.gatingSequences, current);
if (wrapPoint > gatingSequence) {
LockSupport.parkNanos(1L);
} else {
this.gatingSequenceCache.set(gatingSequence);
}
}
} while(!this.cursor.compareAndSet(current, next));
return next;
}
}
处理事件
几个重要的类:
- WorkProcessor:此类是事件处理类,实现了 Runnable 接口
- WorkPool:对处理序列 Sequence 和处理类 WorkProcessor 的封装类
- WorkerPoolInfo:实现了 ConsumerInfo 类,对 WorkerPool 和SequenceBarrier 的封装类
- ConsumerRepository:消费仓库
处理流程:
- 调用 disruptor.handleEventsWithWorkerPool 初始化 Disruptor 时,会向 consumerRepository 消费仓库中添加 WorkerPoolInfo (包装了WorkerPool (将 ringBuffer 对象传入)和 SequenceBarrier)
- 调用 disruptor.start 时,会从consumerRepository 消费仓库中取出 ConsumerInfo 即 WorkerPoolInfo,并调用其 start 方法
- WorkerPoolInfo 的 start 方法 会调用当前 workerPool 的 start 方法,WorkerPool 的 start 方法会使用传进来的线程池去执行 WorkProcessor
- 在 WorkProcessor 的 run 方法中会调用 WorkHandler 的 onEvent 方法,即自己定义的消费者类
参考:
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付