- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
Disruptor的高性能,是多种技术结合以及本身架构的结果。本文主要讲源码,涉及到的相关知识点需要读者自行去了解,以下列出:
此节结合demo来看更容易理解:传送门 。
下图来自官方文档 。
官方原图有点乱,我翻译一下 。
在讲原理前,先了解 Disruptor 定义的术语 。
Event 。
存放数据的单位,对应 demo 中的 LongEvent 。
Ring Buffer 。
环形数据缓冲区:这是一个首尾相接的环,用于存放 Event ,用于生产者往其存入数据和消费者从其拉取数据 。
Sequence 。
序列:用于跟踪进度(生产进度、消费进度) 。
Sequencer 。
Disruptor的核心,用于在生产者和消费者之间传递数据,有单生产者和多生产者两种实现.
Sequence Barrier 。
序列屏障,消费者之间的依赖关系就靠序列屏障实现 。
Wait Strategy 。
等待策略,消费者等待生产者将发布的策略 。
Event Processor 。
事件处理器,循环从 RingBuffer 获取 Event 并执行 EventHandler.
Event Handler 。
事件处理程序,也就是消费者 。
Producer 。
生产者 。
环形数据缓冲区(RingBuffer),逻辑上是首尾相接的环,在代码中用数组来表示 Object[] 。Disruptor生产者发布分两步 。
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 获取下一个可用位置的下标(步骤1)
long sequence = ringBuffer.next();
try {
// 返回可用位置的元素
LongEvent event = ringBuffer.get(sequence);
// 设置该位置元素的值
event.set(l);
} finally {
// 发布
ringBuffer.publish(sequence);
}
这两个步骤由 Sequencer 完成,分为单生产者和多生产者实现 。
如果申请 2 个元素,则如下图所示(圆表示 RingBuffer) 。
// 一般不会有以下写法,这里为了讲解源码才使用next(2)
// 向RingBuffer申请两个元素
long sequence = ringBuffer.next(2);
for (long i = sequence-1; i <= sequence; i++) {
try {
// 返回可用位置的元素
LongEvent event = ringBuffer.get(i);
// 设置该位置元素的值
event.set(1);
} finally {
ringBuffer.publish(i);
}
}
next 申请成功的序列,cursor 消费者最大可用序列,gatingSequence 表示能申请的最大序列号。红色表示待发布,绿色表示已发布。 申请相当于占位置,发布需要一个一个按顺序发布 。
如果 RingBuffer 满了呢,在上图步骤二的基础上,生产者发布了3个元素,消费者消费1个。此时生产者再申请 2个元素,就会变成下图所示 。
只剩下 1 个空间,但是要申请 2个元素,此时程序会自旋等待空间足够.
接下来结合代码看,单生产者的 Sequencer 实现为 SingleProducerSequencer,先看看构造方法 。
abstract class SingleProducerSequencerPad extends AbstractSequencer
{
protected long p1, p2, p3, p4, p5, p6, p7;
SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
long nextValue = Sequence.INITIAL_VALUE;
long cachedValue = Sequence.INITIAL_VALUE;
}
public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
protected long p1, p2, p3, p4, p5, p6, p7;
public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
}
这是 Disruptor 高性能的技巧之一,SingleProducerSequencer 需要的类变量只有 nextValue 和cachedValue,p1 ~ p7 的作用是填充缓存行,这能保证 nextValue 和cachedValue 必定在独立的缓存行,我们可以用 ClassLayout 打印内存布局看看 。
接下来看如何获取序列号(也就是步骤一) 。
// 调用路径
// RingBuffer#next()
// SingleProducerSequencer#next()
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue;
//生产者当前序号值+期望获取的序号数量后达到的序号值
long nextSequence = nextValue + n;
//减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’
long wrapPoint = nextSequence - bufferSize;
//从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’
//这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。
long cachedGatingSequence = this.cachedValue;
//(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’
//(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76
// 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence
//gatingSequences就是消费者队列末尾的序列,也就是消费者消费到哪里了
//实际上就是获得处理的队尾,如果队尾是current的话,说明所有的消费者都执行完成任务在等待新的事件了
long minSequence;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
// 等待1纳秒
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
public void publish(long sequence)
{
// 更新序列号
cursor.set(sequence);
// 等待策略的唤醒
waitStrategy.signalAllWhenBlocking();
}
要解释的都在注释里了,gatingSequences 是消费者队列末尾的序列,对应着就是下图中的 ApplicationConsumer 的 Sequence 。
看完单生产者版,接下来看多生产者的实现。因为是多生产者,需要考虑并发的情况.
如果有A、B两个消费者都来申请 2 个元素 。
cursor 申请成功的序列,HPS 消费者最大可用序列,gatingSequence 表示能申请的最大序列号。红色表示待发布,绿色表示已发布。HPS 是我自己编的缩写,表示 getHighestPublishedSequence 方法的返回值 。
如图所示,只要申请成功,就移动 cursor 的位置。RingBuffer 并没有记录发布情况(图中的红绿颜色),这个发布情况由 MultiProducerSequencer 的 availableBuffer 来维护.
下面看代码 。
public final class MultiProducerSequencer extends AbstractSequencer
{
// 缓存的消费者中最小序号值,相当于SingleProducerSequencerFields的cachedValue
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 标记元素是否可用
private final int[] availableBuffer;
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;
//减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’
long wrapPoint = next - bufferSize;
//从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’
//这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。
long cachedGatingSequence = gatingSequenceCache.get();
//(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’
//(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76
// 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
// 使用cas保证只有一个生产者能拿到next
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
......
}
MultiProducerSequencer 和 SingleProducerSequencer 的 next()方法逻辑大致一样,只是多了CAS的步骤来保证并发的正确性。接着看发布方法 。
public void publish(final long sequence)
{
// 记录发布情况
setAvailable(sequence);
// 等待策略的唤醒
waitStrategy.signalAllWhenBlocking();
}
private void setAvailable(final long sequence)
{
// calculateIndex(sequence):获取序号
// calculateAvailabilityFlag(sequence):RingBuffer的圈数
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
// 上面相当于 availableBuffer[index] = flag 的高性能版
}
记录发布情况,其实相当于 availableBuffer[sequence] = 圈数 ,前面说了, availableBuffer 是用来标记元素是否可用的,如果消费者的圈数 ≠ availableBuffer中的圈数,则表示元素不可用 。
public boolean isAvailable(long sequence)
{
int index = calculateIndex(sequence);
// 计算圈数
int flag = calculateAvailabilityFlag(sequence);
long bufferAddress = (index * SCALE) + BASE;
// UNSAFE.getIntVolatile(availableBuffer, bufferAddress):相当于availableBuffer[sequence] 的高性能版
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
private int calculateAvailabilityFlag(final long sequence)
{
// 相当于 sequence % bufferSize ,但是位操作更快
return (int) (sequence >>> indexShift);
}
isAvailable() 方法判断元素是否可用,此方法的调用堆栈看完消费者就清楚了.
本小节介绍两个方面,一是 Disruptor 的消费者如何实现依赖关系的,二是消费者如何拉取消息并消费 。
我们看回这张图,每个消费者前都有一个 SequenceBarrier ,这就是消费者之间能实现依赖的关键。每个消费者都有一个 Sequence,表示自身消费的进度,如图中,ApplicationConsumer 的 SequenceBarrier 就持有 ReplicaionConsumer 和 JournalConsumer 的 Sequence,这样就能控制 ApplicationConsumer 的消费进度不超过其依赖的消费者.
下面看源码,这是 disruptor 配置消费者的代码.
EventHandler journalConsumer = xxx;
EventHandler replicaionConsumer = xxx;
EventHandler applicationConsumer = xxx;
disruptor.handleEventsWith(journalConsumer, replicaionConsumer)
.then(applicationConsumer);
// 下面两行等同于上面这行
// disruptor.handleEventsWith(journalConsumer, replicaionConsumer);
// disruptor.after(journalConsumer, replicaionConsumer).then(applicationConsumer);
先看ReplicaionConsumer 和 JournalConsumer 的配置 disruptor.handleEventsWith(journalConsumer, replicaionConsumer) 。
/** 代码都在Disruptor类 **/
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
// 没有依赖的消费者就创建新的Sequence
return createEventProcessors(new Sequence[0], handlers);
}
/**
* 创建消费者
* @param barrierSequences 当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组
* @param eventHandlers 事件消费逻辑的EventHandler数组
*/
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
// 对应此事件处理器组的序列组
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
// 创建消费者,注意这里传入了SequenceBarrier
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
// 每次添加完事件处理器后,更新门控序列,以便后续调用链的添加
// 所谓门控,就是RingBuffer要知道在消费链末尾的那组消费者(也是最慢的)的进度,避免消息未消费就被写入覆盖
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
createEventProcessors() 方法主要做了3件事,创建消费者、保存eventHandler和消费者的映射关系、更新 gatingSequences 。
gatingSequences 我们在前面说过,生产者通过 gatingSequences 知道消费者的进度,防止生产过快导致消息被覆盖,更新操作在 updateGatingSequencesForNextInChain() 方法中 。
// 为消费链下一组消费者,更新门控序列
// barrierSequences是上一组事件处理器组的序列(如果本次是第一次,则为空数组),本组不能超过上组序列值
// processorSequences是本次要设置的事件处理器组的序列
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
// 将本组序列添加到Sequencer中的gatingSequences中
ringBuffer.addGatingSequences(processorSequences);
// 将上组消费者的序列从gatingSequences中移除
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
// 取消标记上一组消费者为消费链末端
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
让我们把视线再回到消费者的设置方法 。
disruptor.handleEventsWith(journalConsumer, replicaionConsumer)
.then(applicationConsumer);
journalConsumer 和 replicaionConsumer 已经设置了,接下来是 applicationConsumer 。
/** 代码在EventHandlerGroup类 **/
public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
{
return handleEventsWith(handlers);
}
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return disruptor.createEventProcessors(sequences, handlers);
}
可以看到,设置 applicationConsumer 最终调用的也是 createEventProcessors() 方法,区别就在于 createEventProcessors() 方法的第一个参数,这里的 sequences 就是 journalConsumer 和 replicaionConsumer 这两个消费者的 Sequence 。
消费者的主要消费逻辑在 EventProcessor#run() 方法中,下面以 BatchEventProcessor 举例 。
// BatchEventProcessor#run()
// BatchEventProcessor#processEvents()
private void processEvents()
{
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
{
try
{
// 获取最大可用序列
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
...
// 执行消费逻辑
while (nextSequence <= availableSequence)
{
// dataProvider就是RingBuffer
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch ()
{
// 异常处理
}
}
}
方法简洁明了,在死循环中通过 sequenceBarrier 获取最大可用序列,然后从 RingBuffer 中获取 Event 并调用 EventHandler 进行消费。重点在 sequenceBarrier.waitFor(nextSequence); 中 。
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// 获取可用的序列,这里返回的是Sequencer#next方法设置成功的可用下标,不是Sequencer#publish
// cursorSequence:生产者的最大可用序列
// dependentSequence:依赖的消费者的最大可用序列
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
// 获取最大的已发布成功的序号(对于发布是否成功的校验在此方法中)
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
熟悉的 getHighestPublishedSequence() 方法,忘了就回去看看生产者小节。waitStrategy.waitFor() 对应着图片中的 waitFor() .
前面讲了消费者的处理逻辑,但是 BatchEventProcessor#run() 是如何被调用的呢,关键在于 disruptor.start(),
// Disruptor#start()
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}
return ringBuffer;
}
class EventProcessorInfo<T> implements ConsumerInfo
{
public void start(final Executor executor)
{
// eventprocessor就是消费者
executor.execute(eventprocessor);
}
}
还记得 consumerRepository 吗,没有就往上翻翻设置消费者那里的 disruptor.handleEventsWith() 方法.
所以启动过程就是 disruptor#start() → ConsumerInfo#start() → Executor#execute() → EventProcessor#run() 。
课后作业:Disruptor 的消费者使用了多少线程?
本文讲了 Disruptor 大体逻辑和源码,当然其高性能的秘诀不止文中描述的那些。还有不同的等待策略,Sequence 中使用 Unsafe 而不是JDK中的 Atomic 原子类等等.
最后此篇关于Disruptor-源码解读的文章就讲到这里了,如果你想了解更多关于Disruptor-源码解读的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
引言 深拷贝是指创建一个新对象,该对象的值与原始对象完全相同,但在内存中具有不同的地址。这意味着如果您对原始对象进行更改,则不会影响到复制的对象 常见的C#常见的深拷贝方式有以下4类:
人工智能是一种未来性的技术,目前正在致力于研究自己的一套工具。一系列的进展在过去的几年中发生了:无事故驾驶超过300000英里并在三个州合法行驶迎来了自动驾驶的一个里程碑;IBM Waston击败了
我已经阅读了所有 HERE Maps API 文档,但找不到答案。 HERE实时流量REST API输出中的XML标签是什么意思? 有谁知道如何解释这个输出(我在我的请求中使用了接近参数)? 最佳答
我的 iPad 应用程序工作正常,我将其留在现场进行测试,但现在崩溃了[保存时?] 这是崩溃日志, Incident Identifier: 80FC6810-9604-4EBA-A982-2009A
我的程序需要 qsort 的功能才能运行,但到目前为止还没有完成它的工作。 我实际上是在对单个字符值的数组进行排序,以便将它们分组,这样我就可以遍历数组并确定每个属性的计数。我的问题是 qsort 返
就目前情况而言,这个问题不太适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、民意调查或扩展讨论。如果您觉得这个问题可以改进并可能重新开放,visit
我正在尝试使用 AVR 代码对 Arduino Uno 进行编程,因为我不被允许在 9 月份开始的高级项目中使用 Arduino 库。我找到了数据表,让数字引脚正常工作,然后尝试通过 USB 串行连接
我遇到了多次崩溃,似乎 native iOS 方法正在从第三方库调用函数。这是一个例子: Thread: Unknown Name (Crashed) 0 libsystem_kernel.d
我理解如何按照 Dijkstra 算法的解释找到从头到尾的最短路径,但我不明白的是解释。在这里,从图中的图形来看,从 A 到 E 添加到我已知集合的顺序是 A,C,B,D,F,H,G,E 我没有得到的
我正在查看一些 Django 源代码并遇到了 this . encoding = property(lambda self: self.file.encoding) 究竟是做什么的? 最佳答案 其他两
Sentry 提供了很好的图表来显示消息频率,但关于它们实际显示的内容的信息很少。 这些信息是每分钟吗? 5分钟? 15分钟?小时? 最佳答案 此图表按分钟显示。这是负责存储该图数据的模型。 http
我对 JavaScript 和 Uniswap 还很陌生。我正在使用 Uniswap V3 从 DAI/USDC 池中获取价格。我的“主要”功能如下所示: async function main()
我正在尝试弄清楚我下载的 Chrome 扩展程序是如何工作的(这是骗子用来窃取 CS:GO 元素的东西,并不重要...)。我想知道使用什么电子邮件地址(或使用什么其他通信方式)来提交被钓鱼的数据。 这
引言 今天同事问了我一个问题, System.Windows.Forms.Timer 是前台线程还是后台线程,我当时想的是它是跟着UI线程一起结束的,应该是前台线程吧? 我确实没有仔
我需要一些使用 scipy.stats.t.interval() 函数的帮助 http://docs.scipy.org/doc/scipy/reference/generated/scipy.sta
当我在 Oracle 查询计划中看到类似的内容时: HASH JOIN TABLE1 TABLE2 这两个表中的哪一个是 hashed ? Oracle 文档指的是通常被散列的“较小”
我想知道 GridSearchCV 返回的分数与按如下方式计算的 R2 指标之间的差异。在其他情况下,我收到的网格搜索分数非常负(同样适用于 cross_val_score),我将不胜感激解释它是什么
本文分享自华为云社区《 多主创新,让云数据库性能更卓越 》,作者: GaussDB 数据库。 华为《Taurus MM: bringing multi-master to the clou
我真的需要一些帮助来破译这个崩溃报告: Process: Farm Hand [616] Path: /Applications/Farm
我写了一个从 YUV_420_888 到 Bitmap 的转换,考虑到以下逻辑(据我所知): 总结该方法:内核的坐标 x 和 y 与 Y 平面(2d 分配)的非填充部分的 x 和 y 以及输出位图的
我是一名优秀的程序员,十分优秀!