RingBuffer 源码解析

RingBuffer 主要用于在生产者和消费者之间传递数据。至于核心的控制机制已经被转移到 Sequencer 中了,具体的控制机制后续会有写专门的文章,这里先聚焦 RingBuffer。

RingBuffer 基本结构

缓存行填充技术在 Disruptor 中被大量使用,在这里同样也用到了,关于缓存行填充在上一篇文章中已经姜的很详细了,就是为了将 RingBuffer 中不会修改的值一直缓存在 CPU 缓存中,减少访问内存的次数。

而且 RingBuffer 中的对象在 Disruptor 启动时就会被创建后,后续会对这些对象进行重复利用,而不用创建新的对象,所以 Disruptor 中也基本不用进行垃圾回收,以下的代码就是用来初始化对象:

private void fill(EventFactory<E> eventFactory){    
    for (int i = 0; i < bufferSize; i++)    {        
        entries[BUFFER_PAD + i] = eventFactory.newInstance();    
    }
}

在初始化 RingBuffer 时,需要指定三个参数,EventFactory,bufferSize 和 WaitStrategy。EventFactory 就是用来生成数组中元素的工厂,如上代码。

bufferSize 是用来存储消息的数组的容量, 必须是 2 的次方,在 Java 的 HashMap 中,也需要保证容量是 2 的次方,这两个地方的设计是相同的,都是为了可以快速找到目标地址。

我们知道 RingBuffer 中是使用循环数组来存储消息,获取元素位置的时候一般会使用 & 来获取元素位置,如下:

protected final E elementAt(long sequence){   
      return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

sequence 就是当前消息的序列号,indexMask 的值是 bufferSize - 1。在 bufferSize 的值为 2 的 N 次方的情况下,sequence & indexMask 的值就是元素的位置。在 HashMap 中,如果传入的值不是 2 的 N 次方,会被自动转成最接近传入值的 2 的 N 次方,而在 Disruptor 中,则会直接报错。

其实利用取余操作也可以获取到元素的位置,但是取余的操作效率是比 & 操作效率低,对于 Disruptor 这么最求性能极致的框架来说,怎么会放过这个压榨性能的机会。

WaitStrategy 是消费者在等待 RingBuffer 中消息的策略,默认是 BlockingWaitStrategy,也就是获取不到消息时进行阻塞,也可以根据的自己的需要实现自己的等待策略。

创建 RingBuffer 时,可以指定是一个生产者还是多个生产者,但实际就是创建不同的 Sequencer,Sequencer 的原理我们下篇文章再讲:

// 生成支持多个生产者的 RingBuffer, 实际是创建不同的 Sequencer
public static <E> RingBuffer<E> createMultiProducer( EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy)
{
    MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);

    return new RingBuffer<E>(factory, sequencer);
}

// 生成支持单个生产者的 RingBuffer
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy)
{
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

    return new RingBuffer<E>(factory, sequencer);
}

除了上面两种默认的方式之外,还可以通过 create 方法来创建指定类型的 RingBuffer。

RingBuffer 的核心功能就是存储和传输消息,其他功能已经被抽离到其他组件中。

RingBuffer 如何传输数据

RingBuffer 的核心功能就是将消息从生产者取过来,然后给消费者处理。

生产者生产数据

在 RingBuffer 中有很多 translateAndPublish 重载方法,支持不同的参数,但是核心都是传入一个 EventTranslator 的实现类,并且需要实现 translateTo 方法,也就是说,如何将数据放进 RingBuffer 操作又回到了用户的手上。

private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0)
{
    try
    {
        translator.translateTo(get(sequence), sequence, arg0);
    }
    finally
    {
        sequencer.publish(sequence);
    }
}

简单的 EventTranslator 实现如下:

EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, ByteBuffer>()
            {
                public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
                {
                    event.set(bb.getLong(0));
                }
            };

除了利用 EventTranslator,我们也可以自己来实现完整的生产者,如下:

public class LongEventProducer {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  
        try
        {
            LongEvent event = ringBuffer.get(sequence); 
            event.set(bb.getLong(0));  
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

对比这两种不同的实现可以发现,在将消息存入 RingBuffer 时,需要获取一个 sequence,并且在存入数据后需要调用 publish 方法。

获取 sequence 表示的就是当前可以存入数据的位置,在存入数据之后,需要通过 publish 方法高速消费者这个位置的消息可用。

消费者处理数据

在实例化 Disruptor 时,需要配置 Handler:

Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(new LongEventHandler());

同样,数据具体如何处理,也由用户自己来决定:

public class LongEventHandler implements EventHandler<LongEvent> {
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("Event: " + event);
    }
}

Disruptor 支持多个消费者,所以可以添加多个消费者:

disruptor.handleEventsWith(new LongEventHandler(), new AnotherLongEventHandler());

当然也可以调整这几个 Handler 之间的执行顺序:

 disruptor.handleEventsWith(new LongEventHandler()).then(new AnotherLongEventHandler());

随着 Disruptor 的演进,RingBuffer 功能也慢慢变的单一,只要负责数据的存储和传输,不需要负责具体的控制逻辑,这样整个框架结构更je简单,内聚性更好,这个符合 Disruptor 追求极致性能的目标。

©2024 Rayjun    PowerBy Hexo