gpt4 book ai didi

java - 以线程安全的方式从数组列表返回对象?

转载 作者:行者123 更新时间:2023-12-03 12:59:21 26 4
gpt4 key购买 nike

我有一个类,我在liveSocketsByDatacenter方法中每30秒从一个后台线程填充一个映射updateLiveSockets(),然后有一个getNextSocket()方法,该方法将由多个读取器线程调用以获取一个可用的实时套接字,该套接字使用相同的映射来获得此信息。

public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
private final ZContext ctx = new ZContext();

// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}

public static SocketManager getInstance() {
return Holder.instance;
}

private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}

// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
updatedLiveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(addedColoSockets));
}
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
}

private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);

SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}

// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}

// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
if (!CollectionUtils.isEmpty(endpoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
for (SocketHolder obj : endpoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}

// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;

// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());

for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);

boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
// is there any problem the way I am using `SocketHolder` class?
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(liveUpdatedSockets));
}
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
}

如您在我类所见:
  • 从每30秒运行一次的单个后台线程,我用liveSocketsByDatacenter方法中的所有 Activity 套接字填充updateLiveSockets()映射。
  • 然后从多个线程中,调用getNextSocket()方法为我提供一个可用的实时套接字,该套接字使用liveSocketsByDatacenter映射来获取所需的信息。

  • 我的代码工作正常,没有任何问题,并且想看看是否有更好或更有效的方式编写此代码。我还想就线程安全性问题或任何竞争条件(如果有的话)征询意见,但到目前为止我还没有看到任何问题,但我可能是错的。

    我主要担心 updateLiveSockets()方法和 getLiveSocketX()方法。我要在LINE A处迭代 liveSocketsListSocketHolder,然后制作一个新的 SocketHolder对象并添加到另一个新列表中。这可以吗?

    注意: SocketHolder是不可变的类。

    最佳答案

    代码B或C都不是线程安全的。

    代码B

    当您在enpoints列表上进行迭代以进行复制时,没有什么可以阻止另一个线程进行修改,即要添加和/或删除的元素。

    代码C

    假设endpoints不为null,则对列表对象进行三个调用:isEmptysizeget。从并发角度来看,存在几个问题:

  • 基于参数的List<SocketHolder>类型,不能保证这些方法对列表执行内部更改以传播到其他线程(内存可见性),更不用说竞争条件(如果在线程执行一个线程的同时修改了列表)此功能)。
  • 让我们假设列表endpoints提供了前面描述的保证-例如它已经用Collections.synchronizedList()包裹了。在这种情况下,仍然缺少线程安全性,因为在每次调用isEmptysizeget之间,可以在线程执行getLiveSocketX方法时修改列表。这可能会使您的代码使用列表的过时状态。例如,您可以使用endpoints.size()返回的大小,该大小不再有效,因为元素已添加到列表或从列表中删除。

  • 编辑-代码更新后

    在您提供的代码中,乍一看似乎:
  • 实际上,您实际上并没有在endpoints方法中共同修改我们之前讨论的getLiveSocketX列表,因为updateLiveSockets方法会创建一个新列表liveUpdatedSockets,您将从现有的liveSockets中填充该列表。
  • 您可以使用AtomicReferenceDatacenters映射到感兴趣的套接字列表。此AtomicReference的结果是迫使内存可见性从此映射向下到所有列表及其元素。这意味着,从副作用上来说,您可以防止“生产者”线程和“消费者”线程之间的内存不一致(分别执行updateLiveSocketsgetLiveSocket)。不过,您仍然会遇到竞争状况-假设updateLiveSocketsgetLiveSocket同时运行。考虑一个套接字S,它的状态只是从 Activity 状态切换到关闭状态。 updateLiveSockets将看到套接字S的状态为非 Activity 状态,并相应地创建一个新的SocketHolder。但是,在完全相同的时间运行的getLiveSocket将看到S处于过时状态-因为它仍将使用正在重新创建updateLiveSockets的套接字列表。
  • synchronized方法上使用的updateLiveSockets关键字在这里不能为您提供任何保证,因为代码的其他部分也都不是synchronized

  • 总而言之,我会说:
  • 编写的getLiveSocketX的代码是,而不是本身是线程安全的;
  • 但是,复制列表的方式会阻止并发修改。并且您从AtomicReference副作用中受益,可以最大程度地保证内存可见性,您可以期望在从另一个线程生成套接字后,在getNextSocket中获得一致的套接字列表;
  • 您仍然处于(2)中所述的竞争条件下,但这可能很好,具体取决于您希望赋予getLiveSocketgetNextSocket方法的规范-您可以接受getLiveSocket返回的一个套接字不可用并具有一个重试机制。

  • 综上所述,我将彻底审查并重构代码,以显示出更具可读性和明确性的线程安全的使用者/生产者模式。在使用 AtomicReference和单个 synchronized时应格外小心,在我看来,它们的用法不正确-尽管 AtomicReference确实可以如前所述为您提供帮助。

    关于java - 以线程安全的方式从数组列表返回对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47024563/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com