gpt4 book ai didi

java - 以线程安全的方式填充 map ,并将该 map 从后台线程传递给另一个方法?

转载 作者:行者123 更新时间:2023-12-03 12:59:34 26 4
gpt4 key购买 nike

我有一个下面的类,其中多个线程将调用 add 方法来以线程安全的方式填充 channelMessageHolder CHM。

在同一个类中,我有一个backgrond线程,该线程每30秒运行一次,它通过传递 send 的数据来调用 channelMessageHolder 方法。

public class Processor {
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> channelMessageHolder =
new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());

private Processor() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
send();
}
}, 0, 30, TimeUnit.SECONDS);
}

// this will be called by only single background thread
private void send(ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> messageByChannels) {
for(Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels.entrySet()) {
Channel channel = entry.getKey();
ConcurrentLinkedQueue<Message> messageHolder = entry.getValue();

while (!messageHolder.isEmpty()) {
Message message = messageHolder.poll();
....
// process this and send to database
}
}
}

// called by multiple threads
public void add(final Channel channel, final Message message) {
// populate channelMessageHolder in a thread safe way
}
}

问题

如您所见, channelMessageHolder类中已经存在 Processor,所以我是否需要每30秒显式传递一次来自此映射的数据以发送方法?或者我可以在send方法中直接使用它?

令人困惑的是,如果我直接在send方法中使用它,那么它将同时由多个线程填充,因此这就是为什么我使用AtomicReference的 getAndSet方法将其传递给 send方法的原因。

让我知道我在做什么是错误的,还有更好的方法吗?

最佳答案

As you can see channelMessageHolder is already present in my Processor class so do I need to explicitly pass data from this map every 30 seconds to send method? Or I can directly use it in my send method?



您当然可以直接在 send()方法中使用它,并且由于 AtomicReference已经同步,因此不需要 ConcurrentHashMap包装器。您需要担心的是,映射中的键和值对象已正确同步。我假设 Channel是不可变的,并且 ConcurrentLinkedQueue是并发的,所以您应该不错。
// no need for AtomicReference
private final ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> channelMessageHolder =
new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>();
ConcurrentHashMap负责您的同步,因此生产者线程可以在发送方线程发送项目的同时将项目添加到其中,而不会发生冲突。仅当您试图在多个线程之间共享不同步的类时,才需要 AtomicReference

Confusion is, if I directly use it in my send method, then it will be populated by multiple threads at the same time so that's why I am using getAndSet method of AtomicReference to pass it to send method.



是的,但是还可以。多个线程将消息添加到 ConcurrentLinkedQueue。您的后台线程每30秒启动一次,获取 Channel,出队,然后发送此时在队列中的消息。 ConcurrentLinkedQueue可以防止生产者和消费者的竞争条件。

您的代码中存在的问题是,它不是可重入的,因为它依赖于对队列的多次调用:
while (!messageHolder.isEmpty()) {
Message message = messageHolder.poll();

它适用于您的情况,因为看起来只有一个线程出队,但是以下代码更好:
while (true) {
// only one call to the concurrent queue
Message message = messageHolder.poll();
if (message == null) {
break;
}
...
}

关于java - 以线程安全的方式填充 map ,并将该 map 从后台线程传递给另一个方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42037894/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com