RingBuffer 源码解析
RingBuffer 主要用于在生产者和消费者之间传递数据。至于核心的控制机制已经被转移到 Sequencer
中了,具体的控制机制后续会有写专门的文章,这里先聚焦 RingBuffer。
RingBuffer 基本结构
缓存行填充技术在 Disruptor 中被大量使用,在这里同样也用到了,关于缓存行填充在上一篇文章中已经姜的很详细了,就是为了将 RingBuffer 中不会修改的值一直缓存在 CPU 缓存中,减少访问内存的次数。
而且 RingBuffer 中的对象在 Disruptor 启动时就会被创建后,后续会对这些对象进行重复利用,而不用创建新的对象,所以 Disruptor 中也基本不用进行垃圾回收,以下的代码就是用来初始化对象:
private void fill(EventFactory<E> eventFactory){
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
在初始化 RingBuffer 时,需要指定三个参数,EventFactory,bufferSize 和 WaitStrategy。EventFactory 就是用来生成数组中元素的工厂,如上代码。
bufferSize 是用来存储消息的数组的容量, 必须是 2 的次方,在 Java 的 HashMap 中,也需要保证容量是 2 的次方,这两个地方的设计是相同的,都是为了可以快速找到目标地址。
我们知道 RingBuffer 中是使用循环数组来存储消息,获取元素位置的时候一般会使用 & 来获取元素位置,如下:
protected final E elementAt(long sequence){
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
sequence 就是当前消息的序列号,indexMask 的值是 bufferSize - 1。在 bufferSize 的值为 2 的 N 次方的情况下,sequence & indexMask 的值就是元素的位置。在 HashMap 中,如果传入的值不是 2 的 N 次方,会被自动转成最接近传入值的 2 的 N 次方,而在 Disruptor 中,则会直接报错。
其实利用取余操作也可以获取到元素的位置,但是取余的操作效率是比 & 操作效率低,对于 Disruptor 这么最求性能极致的框架来说,怎么会放过这个压榨性能的机会。
WaitStrategy 是消费者在等待 RingBuffer 中消息的策略,默认是 BlockingWaitStrategy,也就是获取不到消息时进行阻塞,也可以根据的自己的需要实现自己的等待策略。
创建 RingBuffer 时,可以指定是一个生产者还是多个生产者,但实际就是创建不同的 Sequencer,Sequencer 的原理我们下篇文章再讲:
// 生成支持多个生产者的 RingBuffer, 实际是创建不同的 Sequencer
public static <E> RingBuffer<E> createMultiProducer( EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy)
{
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
// 生成支持单个生产者的 RingBuffer
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy)
{
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
除了上面两种默认的方式之外,还可以通过 create 方法来创建指定类型的 RingBuffer。
RingBuffer 的核心功能就是存储和传输消息,其他功能已经被抽离到其他组件中。
RingBuffer 如何传输数据
RingBuffer 的核心功能就是将消息从生产者取过来,然后给消费者处理。
生产者生产数据
在 RingBuffer 中有很多 translateAndPublish 重载方法,支持不同的参数,但是核心都是传入一个 EventTranslator
的实现类,并且需要实现 translateTo
方法,也就是说,如何将数据放进 RingBuffer 操作又回到了用户的手上。
private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0)
{
try
{
translator.translateTo(get(sequence), sequence, arg0);
}
finally
{
sequencer.publish(sequence);
}
}
简单的 EventTranslator 实现如下:
EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
{
event.set(bb.getLong(0));
}
};
除了利用 EventTranslator,我们也可以自己来实现完整的生产者,如下:
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next();
try
{
LongEvent event = ringBuffer.get(sequence);
event.set(bb.getLong(0));
}
finally
{
ringBuffer.publish(sequence);
}
}
}
对比这两种不同的实现可以发现,在将消息存入 RingBuffer 时,需要获取一个 sequence,并且在存入数据后需要调用 publish 方法。
获取 sequence 表示的就是当前可以存入数据的位置,在存入数据之后,需要通过 publish 方法高速消费者这个位置的消息可用。
消费者处理数据
在实例化 Disruptor 时,需要配置 Handler:
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(new LongEventHandler());
同样,数据具体如何处理,也由用户自己来决定:
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
Disruptor 支持多个消费者,所以可以添加多个消费者:
disruptor.handleEventsWith(new LongEventHandler(), new AnotherLongEventHandler());
当然也可以调整这几个 Handler 之间的执行顺序:
disruptor.handleEventsWith(new LongEventHandler()).then(new AnotherLongEventHandler());
随着 Disruptor 的演进,RingBuffer 功能也慢慢变的单一,只要负责数据的存储和传输,不需要负责具体的控制逻辑,这样整个框架结构更je简单,内聚性更好,这个符合 Disruptor 追求极致性能的目标。