【杂谈】Disruptor——RingBuffer问题整理(一)
纯CAS为啥比加锁要快?
同样是修改数据,一个采用加锁的方式保证原子性,一个采用CAS的方式保证原子性。
都是能够达到目的的,但是常用的锁(例如显式的Lock和隐式的synchonized),都会把获取不到锁的线程挂起,相对于CAS的不挂起,多了挂起和唤醒的开销。
尾指针是如何管理的,如何防止覆盖旧数据?
别的帖子都说RingBuffer中不维护尾指针,尾指针由消费者维护(所谓维护指针,就是修改、移动指针)其实这一句话有点误导性,如果RingBuffer不知道尾部在哪里,那它的数据存储肯定就会出问题,例如把还没消费过的数据给覆盖了。
确实,消费者会自行维护自己的消费指针,RingBuffer也不会去干涉消费者指针的维护,但是它会引用所有消费者的指针,读取他们的值,以此作为“尾部”的判断依据。实际上就是最慢的那个消费者为准。
注:消费者指针是消费者消费过的最后一条数据的序号
我们直接来看代码,这个是RingBuffer的publishEvent方法,我们看到,它首先取得一个可用的序列号,然后再将数据放入该序列号的对应位置中。我们来看看这个序列号是如何取得的。
@Override public void publishEvent(EventTranslator<E> translator) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence); }
我们先看Sequencer的SingleProducerSequencer实现。这里就是判断如果生产者新指针的位置是否会超过尾部,如果超过尾部就挂起等待。注意这里的等待方式也是自旋方式,只不过,每次失败后都会自行挂起片刻。
这里附上几个图可能更好理解:(画图工具不太好,无法通过单元格上色的方式体现空闲情况)
情况1:队列已满,生产者尝试使用新序号14,但由于(14 – 8 = 6),由于最慢的消费者目前消费的最后一条数据的序号是5,5号之后的数据还没被消费,6 > 5,所以序号14还不能用。生产者线程挂起,下次再次尝试。
情况2:消费者1消费了序号6的数据。(14 – 8 = 6) 不大于 6,这时序号14可用,生产者得到可用的序号。
@Override public long next() { return next(1); } /** * @see Sequencer#next(int) */ @Override public long next(int n) { if (n < 1 || n > bufferSize) { throw new IllegalArgumentException("n must be > 0 and < bufferSize"); } long nextValue = this.nextValue; //当前RingBuffer的游标,即生产者的位置指针 long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; //减掉一圈 long cachedGatingSequence = this.cachedValue; //上一次缓存的最小的消费者指针 //条件1:生产者指针的位置超过当前消费最小的指针 //条件2:为特殊情况,这里先不考虑,详见: if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fence long minSequence; //再次遍历所有消费者的指针,确认是否超过 //如果超过,则等待 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; }
另外对于多生产者的情况,在不会越界的情况下,需要通过CAS来保证获取序号的原子性。具体可以查看MultiProducerSequencer的next方法。
消费者指针是如何读取的?
RingBuffer如何知道有哪些消费者?哪些gatingSequense是从哪里来的?
在构建RingBuffer注册处理类的时候,就将消费者Sequense注册到RingBuffer中了。
看代码的话,定位到gatingSequences在AbastractSequencer,对应的有个addGatingSequenses方法用于注入gatingSequence
public abstract class AbstractSequencer implements Sequencer { //... protected volatile Sequence[] gatingSequences = new Sequence[0]; @Override public final void addGatingSequences(Sequence... gatingSequences) { SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences); } //... }
再查看addGatingSequences被调用的地方,即通过RingBuffer的方法,设置到Sequencer中,这个Sequence是生产者使用的序号管理器
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { //... protected final Sequencer sequencer; public void addGatingSequences(Sequence... gatingSequences) { sequencer.addGatingSequences(gatingSequences); } //... }
而RingBuffer的addGatingSequence则在Disruptor配置处理器的时候被调用
public class Disruptor<T> { //... private final RingBuffer<T> ringBuffer; private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>(); public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors) { for (final EventProcessor processor : processors) { consumerRepository.add(processor); } final Sequence[] sequences = new Sequence[processors.length]; for (int i = 0; i < processors.length; i++) { sequences[i] = processors[i].getSequence(); } ringBuffer.addGatingSequences(sequences); return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors)); } //... }
缓存的意义是什么?
我们看到在SiingleProducerSequencer的next方法中,会缓存上一次的消费者最小序列号,这有什么用呢?
用途就是不需要每次都读取各消费者的序号,只要没超过上一次的最小值的地方都可以直接分配,如果超过了,则进行再次判断
为啥读取最小值不需要保证原子性?
看了这个获取最小消费序号的,可能会奇怪,为啥这个操作不需要上锁,这个不是会获取到旧值吗?
确实,这个最小值获取到的时候,实际上数值已经变更。但是由于我们的目的是为了防止指针越位,所以用旧值是没有问题的。(旧值<=实际上的最小值)
public static long getMinimumSequence(final Sequence[] sequences, long minimum) { for (int i = 0, n = sequences.length; i < n; i++) { long value = sequences[i].get(); minimum = Math.min(minimum, value); } return minimum; }