gpt4 book ai didi

java - 为什么我的 Disruptor 程序没有充分利用环形缓冲区

转载 作者:行者123 更新时间:2023-11-29 04:22:18 28 4
gpt4 key购买 nike

Disruptor github地址为:https://github.com/LMAX-Exchange/disruptor

我有一个简单的测试如下:

public class DisruptorMain {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(String[] args) throws Exception {
class Element {

private int value;

public int get() {
return value;
}

public void set(int value) {
this.value = value;
}

}

ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};

EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};

EventHandler<Element> handler = new EventHandler<Element>() {
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch) {
try {
Thread.sleep(1000 * sequence);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("Element: " + element.get());
}
};

BlockingWaitStrategy strategy = new BlockingWaitStrategy();

int bufferSize = 4;

Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

disruptor.handleEventsWith(handler);

disruptor.start();

RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

for (int l = 0; l < 8; l++) {
long sequence = ringBuffer.next();
System.out.println("sequence:" + sequence);

try {
Element event = ringBuffer.get(sequence);
event.set(l);
} finally {
ringBuffer.publish(sequence);
}
}
}
}

结果是:顺序:0顺序:1顺序:2顺序:3元素:0元素:1元素:2元素:3顺序:4顺序:5顺序:6顺序:7元素:4元素:5元素:6元素:7

在我的测试中,我定义了一个大小为 4 的 ringbuffer,我有一个生产者为其创建 8 个任务,我的问题是,当生产者将 4 个任务放入 ringbuffer 时,消费者开始接受任务从ringbuffer开始工作,任务1完成后,ringbuffer应该有一个空的空间给task 5,但是结果显示,只有当ringbuffer中的所有任务都完成后,ringbuffer才能接受新的任务,为什么?

最佳答案

这是因为 Disruptor 将批处理事件处理程序。如果事件处理程序很慢或环形缓冲区很小,则批大小通常可以是环形缓冲区的大小。 Disruptor 只会更新该事件处理程序的处理序列,直到批处理完成。这减少了它需要对发布者用来确定空间是否可用的序列变量进行更新的次数。如果您需要在默认设置之前提供可用空间,那么您可以使用 SequenceReportingEventHandler 来实现。

public class MyEventHandler implements SequenceReportingEventHandler<Element> {
Sequence processedSequence;

public void setSequenceCallback(Sequence s) {
processedSequence = s;
}

public void onEvent(Element e, long sequence, boolean endOfBatch) {
// Do stuff
processedSequence.set(sequence);
}
}

关于java - 为什么我的 Disruptor 程序没有充分利用环形缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48352411/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com