阅读目录
具体使用示例代码我这里就不贴,大家可以看我上一篇博客;
初始化;首先在启动的时候,需要预先初始化 RingBuffer,所以需要传入 EventFactory;这里和 JUC 里面 Queue 很不一样的地方地方是,RingBuffer 中的 Event 不会被取出,每次 publish 的时候都是覆盖之前的内容,所以 RingBuffer 这里是不会产生 GC 的;而生产者和消费者都持有一个 Sequence,指示当前的处理位置,当需要获取 Event 的时候,可以直接使用
sequence & ringBuffer.size - 1
除留余数法快速找到对应的数组位置;private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
生产者;同时可以指定 Disruptor 是单生产者还是多生产者:
- ProducerType.SINGLE - SingleProducerSequencer :因为只有一个生产者,所以者更新 sequence 的时候是不需要加锁的;
- ProducerType.MULTI - MultiProducerSequencer :多个生产者的时候,使用乐观锁机制更新 sequence,即
UNSAFE.compareAndSwapLong
;
当没有空余位置的时候他们都是使用
LockSupport.parkNanos(1L);
来阻塞线程的,如果有需要你也可以改成其他的等待模式;// RingBuffer // 首先通过 Sequencer 拿到下一个可用的序列 public long next() { return sequencer.next(); } // 然后用除留余数发拿到对应的数组元素 public E get(long sequence) { return elementAt(sequence); } // 这里是使用 UNSAFE 直接获取内存对象 protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } // 最后将拿到的数组元素修改为新的 Event,再发布 public void publish
关键字: