gpt4 book ai didi

java - 创建一个支持 "snapshots"的 ConcurrentHashMap

转载 作者:塔克拉玛干 更新时间:2023-11-02 07:49:57 24 4
gpt4 key购买 nike

我正在尝试创建一个 ConcurrentHashMap支持“快照”以提供一致的迭代器,我想知道是否有更有效的方法来做到这一点。问题在于,如果同时创建两个迭代器,那么它们需要读取相同的值,而并发 HashMap 的弱一致性迭代器的定义并不能保证这种情况。如果可能的话,我还想避免锁定: map 中有几千个值,处理每个项目需要几十毫秒,我不想在这段时间内阻止编写器,因为这可能会导致编写器阻塞一分钟或更长时间。

我目前拥有的:

  1. ConcurrentHashMap's键是字符串,它的值是 ConcurrentSkipListMap<Long, T> 的实例
  2. 当使用 putIfAbsent 将元素添加到 hashmap 时, 然后分配一个新的跳过列表,并通过 skipList.put(System.nanoTime(), t) 添加对象.
  3. 要查询 map ,我使用 map.get(key).lastEntry().getValue()返回最近的值。要查询快照(例如使用迭代器),我使用 map.get(key).lowerEntry(iteratorTimestamp).getValue() , 其中iteratorTimestampSystem.nanoTime() 的结果在初始化迭代器时调用。
  4. 如果一个对象被删除,我使用map.get(key).put(timestamp, SnapShotMap.DELETED) ,其中 DELETED 是静态最终对象。

问题:

  1. 是否有已经实现此功能的库?或者除此之外,是否有比 ConcurrentHashMap 更合适的数据结构?和 ConcurrentSkipListMap ?我的键是可比较的,所以也许某种并发树比并发哈希表更好地支持快照。
  2. 如何防止这个东西持续增长?在 X 上或之前初始化的所有迭代器完成后,我可以删除键小于 X 的所有跳过列表条目(映射中的最后一个键除外),但我不知道确定何时的好方法这已经发生了:我可以标记迭代器在它的 hasNext 时已经完成。方法返回 false,但并非所有迭代器都必须运行完成;我可以保留 WeakReference到一个迭代器,这样我就可以检测到它何时被垃圾收集,但是除了使用一个线程遍历弱引用集合然后 hibernate 几分钟之外,我想不出一种检测它的好方法 - 理想情况下线程会阻塞在 WeakReference 上并在包装引用被 GC 时收到通知,但我不认为这是一个选项。

    ConcurrentSkipListMap<Long, WeakReference<Iterator>> iteratorMap;
    while(true) {
    long latestGC = 0;
    for(Map.Entry<Long, WeakReference<Iterator>> entry : iteratorMap.entrySet()) {
    if(entry.getValue().get() == null) {
    iteratorMap.remove(entry.getKey());
    latestGC = entry.getKey();
    } else break;
    }
    // remove ConcurrentHashMap entries with timestamps less than `latestGC`
    Thread.sleep(300000); // five minutes
    }

编辑:为了澄清答案和评论中的一些困惑,我目前正在将弱一致性迭代器传递给公司另一个部门编写的代码,他们要求我增加强度迭代器的一致性。他们已经意识到我不可能制作 100% 一致的迭代器,他们只是希望我尽最大努力。他们更关心吞吐量而不是迭代器一致性,因此粗粒度锁不是一种选择。

最佳答案

您需要特殊实现的实际用例是什么?来自 ConcurrentHashMap 的 Javadoc (强调):

Retrievals reflect the results of the most recently completed update operations holding upon their onset. ... Iterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration. They do not throw ConcurrentModificationException. However, iterators are designed to be used by only one thread at a time.

因此常规 ConcurrentHashMap.values().iterator() 将为您提供一个“一致”的迭代器,但仅供单个线程一次性使用。如果您需要多次和/或通过多个线程使用相同的“快照”,我建议制作 map 的副本。

编辑:根据新信息和对“强一致性”迭代器的坚持,我提供了这个解决方案。请注意,使用 ReadWriteLock 具有以下含义:

  • 写入将被序列化(一次只有一个写入器),因此写入性能可能会受到影响。
  • 只要没有正在进行的写入,就允许并发读取,因此读取性能影响应该是最小的。
  • 活跃的读者会阻止作者,但仅限于检索对当前“快照”的引用所需的时间。线程拥有快照后,无论处理快照中的信息需要多长时间,它都不再阻塞写入器。
  • 当任何写入处于 Activity 状态时,读取器被阻塞;写入完成后,所有读者都可以访问新快照,直到新写入替换它为止。

一致性是通过序列化写入并在每次写入 上制作当前值的副本 来实现的。持有对“陈旧”快照的引用的读者可以继续使用旧快照而不用担心修改,垃圾收集器将在没有人使用旧快照时立即回收它。假定读者没有要求请求较早时间点的快照。

因为快照可能在多个并发线程之间共享,所以快照是只读的,不能修改。此限制也适用于从快照创建的任何 Iterator 实例的 remove() 方法。

import java.util.*;
import java.util.concurrent.locks.*;

public class StackOverflow16600019 <K, V> {
private final ReadWriteLock locks = new ReentrantReadWriteLock();
private final HashMap<K,V> map = new HashMap<>();
private Collection<V> valueSnapshot = Collections.emptyList();

public V put(K key, V value) {
locks.writeLock().lock();
try {
V oldValue = map.put(key, value);
updateSnapshot();
return oldValue;
} finally {
locks.writeLock().unlock();
}
}

public V remove(K key) {
locks.writeLock().lock();
try {
V removed = map.remove(key);
updateSnapshot();
return removed;
} finally {
locks.writeLock().unlock();
}
}

public Collection<V> values() {
locks.readLock().lock();
try {
return valueSnapshot; // read-only!
} finally {
locks.readLock().unlock();
}
}

/** Callers MUST hold the WRITE LOCK. */
private void updateSnapshot() {
valueSnapshot = Collections.unmodifiableCollection(
new ArrayList<V>(map.values())); // copy
}
}

关于java - 创建一个支持 "snapshots"的 ConcurrentHashMap,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16600019/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com