gpt4 book ai didi

java - 带翻转缓冲器的无锁容器

转载 作者:行者123 更新时间:2023-12-02 10:09:42 25 4
gpt4 key购买 nike

对于我的一个项目,它必须支持并发读取和写入,我需要一个能够缓冲项目的容器,直到消费者一次获取每个当前缓冲的项目。由于生产者应该能够生成数据,无论消费者是否读取当前缓冲区,我想出了一个自定义实现,在 AtomicReference 的帮助下。 ,将每个条目添加到支持 ConcurrentLinkedQueue直到执行翻转,导致返回当前条目,同时存储具有空队列和元数据的新条目AtomicReference原子地。

我想出了一个解决方案,例如

public class FlippingDataContainer<E> {

private final AtomicReference<FlippingDataContainerEntry<E>> dataObj = new AtomicReference<>();

public FlippingDataContainer() {
dataObj.set(new FlippingDataContainerEntry<>(new ConcurrentLinkedQueue<>(), 0, 0, 0));
}

public FlippingDataContainerEntry<E> put(E value) {
if (null != value) {
while (true) {
FlippingDataContainerEntry<E> data = dataObj.get();
FlippingDataContainerEntry<E> updated = FlippingDataContainerEntry.from(data, value);
if (dataObj.compareAndSet(data, updated)) {
return merged;
}
}
}
return null;
}

public FlippingDataContainerEntry<E> flip() {
FlippingDataContainerEntry<E> oldData;
FlippingDataContainerEntry<E> newData = new FlippingDataContainerEntry<>(new ConcurrentLinkedQueue<>(), 0, 0, 0);
while (true) {
oldData = dataObj.get();
if (dataObj.compareAndSet(oldData, newData)) {
return oldData;
}
}
}

public boolean isEmptry() {
return dataObj.get().getQueue().isEmpty();
}
}

由于当前值需要被推送到后备队列,所以现在需要注意。目前执行from(data, value)方法确实看起来像这样:

static <E> FlippingDataContainerEntry<E> from(FlippingDataContainerEntry<E> data, E value) {
Queue<E> queue = new ConcurrentLinkedQueue<>(data.getQueue());
queue.add(value);
return new FlippingDataContainerEntry<>(queue,
data.getKeyLength() + (value.getKeyAsBytes() != null ? value.getKeyAsBytes().length : 0),
data.getValueLength() + (value.getValueAsBytes() != null ? value.getValueAsBytes().length : 0),
data.getAuxiliaryLength() + (value.getAuxiliaryAsBytes() != null ? value.getAuxiliaryAsBytes().length : 0));
}

由于另一个线程可能会在该线程执行更新之前更新值而导致重试,因此我需要在每次写入尝试时复制实际队列,否则即使原子性相同,也会将条目添加到共享队列中引用无法更新。因此,简单地将值添加到共享队列可能会导致值条目被多次添加到队列中,而实际上它只应该发生一次。

复制整个队列是一项相当昂贵的任务,因此我修改了仅设置当前队列,而不是复制from(data, value)内的队列。方法,而不是将值元素添加到更新发生时执行的 block 中的共享队列中:

public FlippingDataContainerEntry<E> put(E value) {
if (null != value) {
while (true) {
FlippingDataContainerEntry<E> data = dataObj.get();
FlippingDataContainerEntry<E> updated = FlippingDataContainerEntry.from(data, value);
if (data.compareAndSet(data, updated)) {
updated.getQueue().add(value);
return updated;
}
}
}
return null;
}

from(data, value)内我现在只设置队列,不直接添加value元素

static <E> FlippingDataContainerEntry<E> from(FlippingDataContainerEntry<E> data, E value) {
return new FlippingDataContainerEntry<>(data.getQueue(),
data.getKeyLength() + (value.getKeyAsBytes() != null ? value.getKeyAsBytes().length : 0),
data.getValueLength() + (value.getValueAsBytes() != null ? value.getValueAsBytes().length : 0),
data.getAuxiliaryLength() + (value.getAuxiliaryAsBytes() != null ? value.getAuxiliaryAsBytes().length : 0));
}

虽然与复制队列的代码相比,这允许运行测试的速度快 10 倍以上,但它也经常导致消耗测试失败,因为现在可能会在消费者线程翻转队列后立即将值元素添加到队列中。排队并处理数据,因此并非所有项目都被消耗。

现在的实际问题是,是否可以避免备份队列的复制以获得性能提升,同时仍然允许使用无锁算法自动更新队列的内容,从而避免中途丢失一些条目?

最佳答案

首先,让我们声明一个显而易见的事实 - 最好的解决方案是避免编写任何此类自定义类。也许像 java.util.concurrent.LinkedTransferQueue 这样简单也会同样有效,并且不太容易出错。如果 LinkedTransferQueue 不起作用,那么 LMAX disruptor 又如何呢?或者类似的东西?您是否研究过现有的解决方案?

如果您仍然需要/想要一个自定义解决方案,那么我有一个稍微不同的方法的草图,可以避免复制:

这个想法是让 put 操作围绕某个原子变量旋转,尝试设置它。如果一个线程设法设置它,那么它就获得对当前队列的独占访问权,这意味着它可以追加到该队列。追加后,它会重置原子变量以允许其他线程追加。它基本上是一个 spin-lock 。这样,线程之间的争用发生在追加到队列之前,而不是之后。

关于java - 带翻转缓冲器的无锁容器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55088878/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com