gpt4 book ai didi

java - 在单个后台线程定期修改 map 的同时读取 map

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:14:50 25 4
gpt4 key购买 nike

我有一个类,我在 updateLiveSockets() 方法中每 30 秒从一个后台线程填充一个映射 liveSocketsByDatacenter 然后我有一个方法 getNextSocket() 将由多个读取器线程调用以获取可用的实时套接字,该套接字使用相同的映射来获取此信息。

public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
private final ZContext ctx = new ZContext();

// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}

public static SocketManager getInstance() {
return Holder.instance;
}

private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}

// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
updatedLiveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(addedColoSockets));
}
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
}

private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);

SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}

// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}

// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
if (!CollectionUtils.isEmpty(endpoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
for (SocketHolder obj : endpoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}

// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;

// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());

for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);

boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
// is there any problem the way I am using `SocketHolder` class?
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(liveUpdatedSockets));
}
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
}

正如你在我的类里面看到的:

  • 从每 30 秒运行一次的单个后台线程,我在 updateLiveSockets() 方法中使用所有实时套接字填充 liveSocketsByDatacenter 映射。
  • 然后我从多个线程调用 getNextSocket() 方法给我一个可用的实时套接字,它使用 liveSocketsByDatacenter 映射来获取所需的信息。<

我的代码工作正常,没有任何问题,想看看是否有更好或更有效的方法来编写它。我还想就线程安全问题或任何竞争条件(如果有的话)发表意见,但到目前为止我还没有看到任何意见,但我可能是错的。

我最担心的是 updateLiveSockets() 方法和 getLiveSocketX() 方法。我正在迭代 liveSockets,这是 A 行 SocketHolderList,然后创建一个新的 SocketHolder 对象并添加到另一个新列表。这里可以吗?

注意: SocketHolder 是一个不可变类。你可以忽略我拥有的 ZeroMQ 东西。

最佳答案

您使用以下同步技术。

  1. 具有实时套接字数据的 map 位于原子引用之后,这允许安全地切换 map 。
  2. updateLiveSockets()方法是同步的(隐含在此),这将防止两个线程同时切换 map 。
  3. 如果在 getNextSocket() 期间发生切换,则在使用 map 时对 map 进行本地引用以避免混淆。方法。

是否像现在这样是线程安全的?

线程安全始终取决于共享可变数据上是否存在适当的同步。在这种情况下,共享的可变数据是数据中心到它们的 SocketHolder 列表的映射。

map 位于 AtomicReference 中的事实,并制作一个本地副本以供使用,在 map 上同步就足够了。您的方法采用 map 的一个版本并使用它,由于 AtomicReference 的性质,切换版本是线程安全的.这也可以通过为 map 制作成员字段来实现 volatile ,因为您所做的只是更新引用(您不对其执行任何先检查后执行的操作)。

作为scheduleAtFixedRate()保证通过 Runnable不会与自身同时运行,synchronizedupdateLiveSockets()不需要,但是,它也不会造成任何真正的伤害。

所以是的,这个类是线程安全的。

但是,尚不完全清楚 SocketHolder可以被多个线程同时使用。实际上,此类只是试图尽量减少并发使用 SocketHolder。 s 通过选择一个随机的活的(虽然不需要洗牌整个数组来选择一个随机索引)。它实际上并没有阻止并发使用。

能否提高效率?

我相信可以。查看 updateLiveSockets() 时方法,它似乎构建了完全相同的 map ,除了 SocketHolder s 对于 isLive 可能有不同的值旗帜。这使我得出结论,与其切换整个 map ,我只想切换 map 中的每个列表。为了以线程安全的方式更改映射中的条目,我可以只使用 ConcurrentHashMap .

如果我使用 ConcurrentHashMap ,并且不要切换 map ,而是 map 中的值,我可以摆脱 AtomicReference .

要更改映射,我可以构建新列表并将其直接放入 map 中。这样效率更高,因为我可以更快地发布数据,创建更少的对象,而我的同步只是建立在现成的组件上,这有利于提高可读性。

这是我的构建(为简洁起见,省略了一些不太相关的部分)

public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
private final ZContext ctx = new ZContext();

// ...

private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}

// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
}
}

// ...

// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}

// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}

// no need to make this synchronized
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);

boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;

SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
}
}

}

关于java - 在单个后台线程定期修改 map 的同时读取 map ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46997971/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com