1. RingBuffer.next -> Sequencer.next 这里可能会block住取决于消费端的消费速度。
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue = minSequence; }
2. RingBuffer.publish -> Sequencer.publish -> WaitStrategy.signalAllWhenBlocking 如果消费者在这里block住了,会通知消费者消费。
EventProcessor -> SequenceBarrier.waitFor -> EventHandler.onEvent 如果消费速度赶上生产速度SequenceBarrier会把消费流程block住。
// 方法1 disruptor.handleEventsWithWorkerPool(parserWorkPool).then(new ThenHandler()); // 方法2 SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier(); for (int i = 0; i < parserThreadCount; i++) { handlers[i] = new DrcMsgParser(i,parserThreadCount); batchParseProcessors[i] = new BatchEventProcessor( disruptorMsgBuffer, sequenceBarrier, handlers[i]); disruptorMsgBuffer.addGatingSequences(batchParseProcessors[i].getSequence()); } Sequence[] gatingSequences = new Sequence[parserThreadCount]; for (int i = 0; i < parserThreadCount; i++) { gatingSequences[i]=batchParseProcessors[i].getSequence(); } SequenceBarrier offerSquenceBarrier = disruptorMsgBuffer.newBarrier(gatingSequences); offerProcess = new BatchEventProcessor(disruptorMsgBuffer,offerSquenceBarrier, new OperationBufferScanThread(transferer)); disruptorMsgBuffer.addGatingSequences(offerProcess.getSequence());
SequenceBarrier offerSquenceBarrier = disruptorMsgBuffer.newBarrier(gatingSequences); offerProcess = new BatchEventProcessor(disruptorMsgBuffer,offerSquenceBarrier, new OperationBufferScanThread(transferer));
while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); }
单写原则(Single Writer Principle)
我之前所以为的Disruptor快的原因是,1.RingBuffer 2.防止Cache false sharing的padding 3.Cas操作,可惜1没有理解到位而3正好相反了。应该说Disruptor是单写原则的一个实践。
作者的帖子在 http://mechanical-sympathy.blogspot.co.uk/2011/09/single-writer-principle.html
These lock/CAS instructions cost 10s of cycles in the best case when un-contended, plus they cause out-of-order execution for the CPU to be suspended and load/store buffers to be flushed. At worst, collisions occur and the kernel will need to get involved and put one or more of the threads to sleep until the lock is released.
lock/CAS指令在没有竞争的情况下需要消耗数十个cpu周期加上执行过程中可能会等待load/store buffer同步。更坏的情况下内核可能需要介入把某些线程强制休眠知道锁被释放。
It is interesting to see the emergence of numerous approaches that lend themselves to single writer solutions such as Node.js, Erlang, Actor patterns, and SEDA to name a few. Unfortunately most use queue based implementations underneath, which breaks the single writer principle, whereas the Disruptor strives to separate the concerns so that the single writer principle can be preserved for the common cases.
现在有一些有趣实现单写原则的方式,比如说Node.js, Erlang, Actor模式和SEDA等等。可惜的是这些实现方式的底层多数都是基于queue实现的,而queue本身是违背单写原则的,反之Disruptor则力求保持单写原则。
(LinkedBlockingQueue有锁,而且head,last指针和size变量都是需要Cas的。 ConcurrentLinkedQueue也是有head,tail,next指针需要Cas)
If a system is decomposed into components that keep their own relevant state model, without a central shared model, and all communication is achieved via message passing then you have a system without contention naturally. This type of system obeys the single writer principle if the messaging passing sub-system is not implemented as queues.
current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; }
private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); }
public long getHighestPublishedSequence(long lowerBound, long availableSequence) { for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { if (!isAvailable(sequence)) { return sequence - 1; } } return availableSequence; }
Lazy Set!
public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); }
这篇帖子对lazy set和单写原则之间的关系总结的很好。
1. lazy set和普通的volatile set有什么不同?
lazySet provides a preceeding store-store barrier (which is either a no-op or very cheap on current platforms), but no store-load barrier
lazySet会前置一个store-store barrier(在当前的硬件体系下或者是no-op或者非常轻),而不是store-load barrier
2. store-store barrier能保证什么?
StoreStore Barriers: The sequence: Store1; StoreStore; Store2 ensures that Store1's data are visible to other processors (i.e.,flushed to memory) before the data associated with Store2 and all subsequent store instructions.
在操作序列Store1; StoreStore; Store2中,Store1的数据会在Store2和后续写操作之前对其它处理器可见。换句话说,就是保证了对其它数据可见的写的顺序。
3. lazy set和单写原则的关系
But if there is only a single writer we don't need to do that, as we know no one will ever change the data but us.
And from that follows that strictly speaking lazySet is at the very least as correct as a volatile set for a single writer.
如果只有一个线程写我们就用不着store-load barrier,lazySet和volatile set在单写原则下面是等价的。
4. lazy set会不会影响可见性?(too lazy)
Will lazySet write actually happens in some finite time?
The most you can say from the spec is that it will be written no later than at the point that the process must write anything else in the Synchronization Order, if such a point exists. However, independently of the spec, we know that so long as any process makes progress, only a finite number of writes can be delayed. So, yes.
这一段有点绕口,不会结论是lazy set一定会可见的。但是从Doug Lea的描述中并不能保证lazySet的可见性延迟一定会很小,所以作者做了一些实验,给出的数据说明lazy set的延迟其实应该是相当小的。
However, in practice, these writes usually become visible to the other threads within a few nanoseconds.
Disruptor真是方寸之间深不见底,Martin Thompson请收下我的膝盖。。。
1.volatile read在x86体系下面非常轻量,volatile write后面会跟StoreLoad屏障比较重,这么做是因为大多数程序都是读比写多很多。
Peterson算法用三个变量flag0, flag1, turn加循环实现了线程互斥访问,
while flag1==true && turn==1 线程1要锁且得到了锁所以等待
4.https://juejin.im/post/6844904144273145863 这篇文章把内存屏障彻底理解清楚了