gpt4 book ai didi

java - 如何从不同的线程将条目填充到 map 中,然后从单个后台线程迭代 map 并发送?

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:12:09 24 4
gpt4 key购买 nike

我有一个下面的类,其中有一个 add 方法,该方法由另一个线程调用以填充我的 clientidToTimestampHolder 多映射。然后在下面的同一个类中,我启动了一个每 60 秒运行一次的后台线程,并调用一个 processData() 方法来迭代同一个 map 并将所有这些数据发送到其他服务。

public class Handler {
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private final Multimap<String, Long> clientidToTimestampHolder = ArrayListMultimap.create();

private static class Holder {
private static final Handler INSTANCE = new Handler();
}

public static Handler getInstance() {
return Holder.INSTANCE;
}

private Handler() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
processData();
}
}, 0, 60, TimeUnit.SECONDS);
}

// called by another thread to populate clientidToTimestampHolder map
public void add(final String clientid, final Long timestamp) {
clientidToTimestampHolder.put(clientid, timestamp);
}

// called by background thread
public void processData() {
for (Entry<String, Collection<Long>> entry : clientidToTimestampHolder.asMap().entrySet()) {
String clientid = entry.getKey();
Collection<Long> timestamps = entry.getValue();
for (long timestamp : timestamps) {
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
}
}
}

我的问题是,每次从不同线程调用add 方法。那么我是否需要创建 clientidToTimestampHolder map 的副本并将该副本作为参数传递给 processData() 方法,而不是直接在该 map 上工作?

因为现在我正在使用相同的 map 在其中填充数据,然后还迭代相同的 map 以将内容发送到其他一些服务,所以我不会从该 map 中删除数据,因此这些条目将始终存在于该 map 中.

解决这个问题的最佳方法是什么?而且我需要确保它是线程安全的并且没有竞争条件,因为我不能丢失任何 clientid

更新

那么我的processData 方法将如下所示?

  public void processData() {
synchronized (clientidToTimestampHolder) {
Iterator<Map.Entry<String, Long>> i = clientidToTimestampHolder.entries().iterator();
while (i.hasNext()) {
String clientid = i.next().getKey();
long timestamp = i.next().getValue();
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(clientid);
}
i.remove();
}
}
}

最佳答案

使用 Multimaps.synchronized(List)Multimap 包装器具有对多重映射的线程安全引用( ArrayListMultimapListMultimap ,即将值存储在列表中):

private final ListMultimap<String, Long> clientidToTimestampHolder = 
Multimaps.synchronizedListMultimap(ArrayListMultimap.create());

请注意,同步的多图包装器有以下警告:

It is imperative that the user manually synchronize on the returned multimap when accessing any of its collection views:

// ...  

Failure to follow this advice may result in non-deterministic behavior.

在您的情况下,您必须手动同步条目 View 的迭代,因为它的迭代器未同步:

public void processData() {
synchronized (clientidToTimestampHolder) {
for (Map.Entry<String, Long> entry : clientidToTimestampHolder.entries()) {
String clientid = entry.getKey();
long timestamp = entry.getValue();
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
clientidToTimestampHolder.clear();
}
}

(我使用 Mutlimap.entries() 而不是 Multimap.asMap().entrySet() 因为这样更干净)。

另外,如果您想知道为什么没有通用的 ConcurrentXxxMultimap实现,参见 Guava's issue #135this comment quoting internal discussion about this :

I tried to build a general-purpose concurrent multimap, and it turned out to be slightly faster in a small fraction of uses and Much slower in most uses (compared to a synchronized multimap). I was focused on making as many operations as possible atomic; a weaker contract would eliminate some of this slowness, but would also detract from its usefulness.

I believe the Multimap interface is too "large" to support an efficient concurrent implementation - sorted or otherwise. (Clearly, this is an overstatement, but at the very least it requires either a lot of work or a loosening of the Multimap interface.)

编辑:

阅读您的评论,好像是XY Problem大部头书。话虽如此,IMO 你不应该使用 Multimap这里是因为您不使用它的任何功能,而是使用 BlockingQueue它有一个方便的 drainTo(Collection) 方法(并且是线程安全的):

private final LinkedBlockingQueue<Map.Entry<String, Long>> clientidToTimestampHolder =
new LinkedBlockingQueue<>();

public void add(final String clientid, final Long timestamp) {
clientidToTimestampHolder.offer(Maps.immutableEntry(clientid, timestamp));
}

public void processData() {
final List<Map.Entry<String, Long>> entries = new ArrayList<>();
clientidToTimestampHolder.drainTo(entries);
for (Map.Entry<String, Long> entry : entries) {
String clientid = entry.getKey();
long timestamp = entry.getValue();
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
}

您可以(应该?)为您的数据创建自己的值类来存储 Stringlong字段并使用它代替通用 Map.Entry<String, Long> .

关于java - 如何从不同的线程将条目填充到 map 中,然后从单个后台线程迭代 map 并发送?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42800937/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com