2016年1月30日星期六

学习一下Disruptor

最近项目用到了Disruptor,趁机好好学习了一把,先上类图















几个重要的流程


发布数据


1. RingBuffer.next -> Sequencer.next 这里可能会block住取决于消费端的消费速度。
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
    long minSequence;
    while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
    {
        LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
    }

    this.cachedValue = minSequence;
}

2. RingBuffer.publish -> Sequencer.publish -> WaitStrategy.signalAllWhenBlocking 如果消费者在这里block住了,会通知消费者消费。


消费数据


EventProcessor -> SequenceBarrier.waitFor -> EventHandler.onEvent 如果消费速度赶上生产速度SequenceBarrier会把消费流程block住。
Disruptor提供了两种预设的消费者,BatchEventProcessor,WorkProcessor,区别在于BatchEventProcessor是批量读取消费,而WorkProcessor是单条消费一组WorkProcessor组成一个WorkerPool共享一个消费Sequence实现协同工作。

WorkerPool这种工作方式因为有多个线程共享的Sequence所以违反了单写原则是有一定性能损耗的,另一种实现协同工作的方式是使用多个BatchEventProcessor一起消费,这种情况下每个BatchEventProcessor都是消费相同的数据,但是可以在BatchEventProcessor里面通过hash取模的方式过滤掉应该被其它线程拉取的数据。


多级消费


// 方法1
disruptor.handleEventsWithWorkerPool(parserWorkPool).then(new ThenHandler());

// 方法2
SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier();

for (int i = 0; i < parserThreadCount; i++) {
    handlers[i] = new DrcMsgParser(i,parserThreadCount);
    batchParseProcessors[i] = new BatchEventProcessor(
            disruptorMsgBuffer, sequenceBarrier, handlers[i]);
    disruptorMsgBuffer.addGatingSequences(batchParseProcessors[i].getSequence());
}

Sequence[] gatingSequences = new  Sequence[parserThreadCount];

for (int i = 0; i < parserThreadCount; i++) {
    gatingSequences[i]=batchParseProcessors[i].getSequence();
}

SequenceBarrier offerSquenceBarrier = disruptorMsgBuffer.newBarrier(gatingSequences);
offerProcess = new BatchEventProcessor(disruptorMsgBuffer,offerSquenceBarrier, new OperationBufferScanThread(transferer));
disruptorMsgBuffer.addGatingSequences(offerProcess.getSequence());

上面两种方法都可以实现多级消费,方法1是方法2的封装,方法2中核心是,


SequenceBarrier offerSquenceBarrier = disruptorMsgBuffer.newBarrier(gatingSequences);
offerProcess = new BatchEventProcessor(disruptorMsgBuffer,offerSquenceBarrier, new OperationBufferScanThread(transferer));


在BlockingWaitStrategy中


while ((availableSequence = dependentSequence.get()) < sequence)
{
    barrier.checkAlert();
}


次级消费者的消费Sequence会一直小于前面一级消费者的最小Sequence



单写原则(Single Writer Principle)

我之前所以为的Disruptor快的原因是,1.RingBuffer 2.防止Cache false sharing的padding 3.Cas操作,可惜1没有理解到位而3正好相反了。应该说Disruptor是单写原则的一个实践。

作者的帖子在 http://mechanical-sympathy.blogspot.co.uk/2011/09/single-writer-principle.html


在这里我寻章摘句一下,

 These lock/CAS instructions cost 10s of cycles in the best case when un-contended, plus they cause out-of-order execution for the CPU to be suspended and load/store buffers to be flushed.  At worst, collisions occur and the kernel will need to get involved and put one or more of the threads to sleep until the lock is released.  

lock/CAS指令在没有竞争的情况下需要消耗数十个cpu周期加上执行过程中可能会等待load/store buffer同步。更坏的情况下内核可能需要介入把某些线程强制休眠知道锁被释放。

It is interesting to see the emergence of numerous approaches that lend themselves to single writer solutions such as Node.js, Erlang, Actor patterns, and SEDA to name a few.  Unfortunately most use queue based implementations underneath, which breaks the single writer principle, whereas the Disruptor strives to separate the concerns so that the single writer principle can be preserved for the common cases.

现在有一些有趣实现单写原则的方式,比如说Node.js, Erlang, Actor模式和SEDA等等。可惜的是这些实现方式的底层多数都是基于queue实现的,而queue本身是违背单写原则的,反之Disruptor则力求保持单写原则。
(LinkedBlockingQueue有锁,而且head,last指针和size变量都是需要Cas的。 ConcurrentLinkedQueue也是有head,tail,next指针需要Cas)

If a system is decomposed into components that keep their own relevant state model, without a central shared model, and all communication is achieved via message passing then you have a system without contention naturally.  This type of system obeys the single writer principle if the messaging passing sub-system is not implemented as queues.



如果一个系统被分解成很多自己封装好状态的组件而没有一个中央共享的结构并且所有的通信都是基于不是用queue实现的消息系统,那这样的系统是符合单写原则的。


Disruptor的单写原则












上面这张图是单个生产者,单个消费者的交互流程。流程中有两个位置需要限流,(1)消费者等待生产者,这是通过BatchEventProcessor.sequence<Sequencer.cursor来实现的。(2)生产者等待消费者,这是通过Sequencer.cursor-bufferSize<gatingSequence来实现的。BatchEventProcessor.sequence是消费者线程在写,生产者线程在读,而Sequencer.cursor是生产者线程在写,消费者线程在读,所以都是符合单写原则的。扩展到多个消费者,因为多个消费者都是自己维护自己的sequence,所以还是能保持单写原则。不过WorkerPool这种消费方式由于多个worker会共享workSequence,所以还是有点违反单写原则的。

多个生产者的情况要复杂一些,生产者和消费者之间的交互和单个生产者的情况基本相同,但是还需要考虑生产者之间的交互,主要的逻辑在MultiProducerSequencer里面。

MultiProducerSequencer.next

current = cursor.get();
next = current + n;

long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();

if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
    long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

    if (wrapPoint > gatingSequence)
    {
        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
        continue;
    }

    gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
    break;
}

首先要说上面这段代码的最后用了Cas,所以还是有一点违背了单写原则,这个问题Disruptor的作者也是承认的。和单个生产者还有一个不同地方就是在next方法里面就更新了cursor,消费者的block就有可能被解除了。

MultiProducerSequencer.publish->setAvailable->setAvailableBufferValue

private void setAvailableBufferValue(int index, int flag)
{
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}


这里为了尽量维持单写原则用了一个很cool的技巧,availableBuffer是一个长度和RingBuffer相同的缓存,存的flag的含义是index这个位置上RingBuffer的生产者已经绕了几圈。

在ProcessingSequenceBarrier.waitFor里面除了等待BatchEventProcessor.sequence<Sequencer.cursor之外,会返回一个当前发布的最大sequence。

public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
    for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
    {
        if (!isAvailable(sequence))
        {
            return sequence - 1;
        }
    }

    return availableSequence;
}

在这里之前之前缓存的那个圈数就可以用来判断某个sequence是否已经published。availableBuffer虽然被多个线程写了,但是对于里面的一个slot在一个时间点是只会有一个线程去写的。


Lazy Set!

促使我仔细学习Disruptor的动力在于浏览Sequence代码的时候看到这一段

public void set(final long value)
{
    UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}

Disruptor里面默认的set方法是lazySet的,之前基本没有这么玩过,极端不明觉厉。

http://psy-lob-saw.blogspot.co.uk/2012/12/atomiclazyset-is-performance-win-for.html

这篇帖子对lazy set和单写原则之间的关系总结的很好。

1. lazy set和普通的volatile set有什么不同?

lazySet provides a preceeding store-store barrier (which is either a no-op or very cheap on current platforms), but no store-load barrier

lazySet会前置一个store-store barrier(在当前的硬件体系下或者是no-op或者非常轻),而不是store-load barrier

2. store-store barrier能保证什么?

StoreStore Barriers: The sequence: Store1; StoreStore; Store2 ensures that Store1's data are visible to other processors (i.e.,flushed to memory) before the data associated with Store2 and all subsequent store instructions.

在操作序列Store1; StoreStore; Store2中,Store1的数据会在Store2和后续写操作之前对其它处理器可见。换句话说,就是保证了对其它数据可见的写的顺序。

3. lazy set和单写原则的关系

But if there is only a single writer we don't need to do that, as we know no one will ever change the data but us.
And from that follows that strictly speaking lazySet is at the very least as correct as a volatile set for a single writer.

如果只有一个线程写我们就用不着store-load barrier,lazySet和volatile set在单写原则下面是等价的。

4. lazy set会不会影响可见性?(too lazy)

Will lazySet write actually happens in some finite time?
The most you can say from the spec is that it will be written no later than at the point that the process must write anything else in the Synchronization Order, if such a point exists. However, independently of the spec, we know that so long as any process makes progress, only a finite number of writes can be delayed. So, yes.

这一段有点绕口,不会结论是lazy set一定会可见的。但是从Doug Lea的描述中并不能保证lazySet的可见性延迟一定会很小,所以作者做了一些实验,给出的数据说明lazy set的延迟其实应该是相当小的。

http://robsjava.blogspot.jp/2013/06/a-faster-volatile.html

However, in practice, these writes usually become visible to the other threads within a few nanoseconds.

上面这篇帖子也是说lazySet的写在实践上来延迟是纳秒级。

在Disruptor里面没有setVolatile调用,主流程里面compareAndSet的调用只有一个地方,其它所有的更新都是lazySet。


Disruptor真是方寸之间深不见底,Martin Thompson请收下我的膝盖。。。


补充一些信息省的老忘老去找

1.volatile read在x86体系下面非常轻量,volatile write后面会跟StoreLoad屏障比较重,这么做是因为大多数程序都是读比写多很多。
http://brooker.co.za/blog/2012/09/10/volatile.html

2.没有StoreLoad屏障会怎样?看Peterson算法

Peterson算法用三个变量flag0, flag1, turn加循环实现了线程互斥访问,
flag0=ture先声明自已要锁
turn=1先尝试把资源让给线程1防止死锁
while flag1==true && turn==1 线程1要锁且得到了锁所以等待
flag0=false声明自己不要锁了
基于共享变量很巧妙的锁算法。。。

如果flag0=true和flag1==true重排,就有可能发生两个线程都可以同时通过while判断

3.从内存模型角度看线程0和线程1可以同时访问变量x,变量x在不同core的buffer里面,所以x的最终结果可能是错误的。但如果只有一个线程写,那它始终只能在一个core的buffer里面,所以不可能读到一个陈旧的值,然后根据这个值去计算然后更新出错误的结果

4.https://juejin.im/post/6844904144273145863 这篇文章把内存屏障彻底理解清楚了