- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我有一个类,其中我每 30 秒从一个后台线程填充一个 map liveSocketsByDatacenter
,然后我有一个方法 getNextSocket
,它将被多个调用阅读器线程获取可用的实时套接字,该套接字使用相同的映射来获取此信息。
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>();
private final ZContext ctx = new ZContext();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads to get the next live socket
public Optional<SocketHolder> getNextSocket() {
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
Collections.shuffle(listOfEndPoints);
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
return Optional.of(obj);
}
}
}
return Optional.absent();
}
private void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
boolean status = SendToSocket.getInstance().execute(3, holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), liveUpdatedSockets);
}
}
}
正如你在我上面的类(class)中看到的那样:
liveSocketsByDatacenter
映射。getNextSocket
方法来为我提供可用的实时套接字,它使用 liveSocketsByDatacenter
映射来获取所需的信息。我上面的代码线程安全吗?所有的阅读器线程都能准确地看到 liveSocketsByDatacenter
吗?由于我每 30 秒从一个后台线程修改一次 liveSocketsByDatacenter
映射,然后从许多读取器线程修改一次,因此我调用了 getNextSocket
方法,所以我不确定我是否这样做了这里有什么问题。
看起来我的“getLiveSocket”方法中可能存在线程安全问题,因为每次读取都会从 map 中获取一个共享的 ArrayList
并将其洗牌?而且可能还有一些我可能错过的地方。在我的代码中修复这些线程安全问题的最佳方法是什么?
如果有任何更好的方法来重写它,那么我也愿意接受。
最佳答案
为了线程安全,您的代码必须同步对所有共享可变状态的任何访问。
给大家分享liveSocketsByDatacenter
, HashMap
的实例Map
的非线程安全 实现可以同时读取(通过 updateLiveSockets
和 getNextSocket
)和修改(通过 connectToZMQSockets
和 updateLiveSockets
)而不同步任何已经足以使您的代码非线程安全的访问。此外,这个 Map
的值是 ArrayList
的实例List
的非线程安全 实现也可以同时读取(由 getNextSocket
和 updateLiveSockets
)和修改(由 getLiveSocket
更准确地由 Collections.shuffle
)。
解决 2 线程安全问题的简单方法可能是:
ConcurrentHashMap
而不是 HashMap
对于你的变量 liveSocketsByDatacenter
因为它是 Map
的 native 线程安全实现.ArrayList
的不可修改 版本放入使用 Collections.unmodifiableList(List<? extends T> list)
作为 map 值的实例,那么您的列表将是不可变的,因此线程安全。 例如:
liveSocketsByDatacenter.put(
entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)
);`
getLiveSocket
避免调用 Collections.shuffle
直接在您的列表上,例如,您可以仅随机播放 Activity 套接字列表而不是所有套接字,或者使用列表的副本(例如 new ArrayList<>(listOfEndPoints)
)而不是列表本身。例如:
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}
对于 #1,因为您似乎经常阅读并且很少(每 30 秒一次)修改您的 map ,您可以考虑重建您的 map ,然后每 30 秒共享其不可变版本(使用 Collections.unmodifiableMap(Map<? extends K,? extends V> m)
),这种方法是在大部分阅读场景中非常高效,因为您不再需要为访问 map 内容而支付任何同步机制的费用。
您的代码将是:
// Your variable is no more final, it is now volatile to ensure that all
// threads will see the same thing at all time by getting it from
// the main memory instead of the CPU cache
private volatile Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter
= Collections.unmodifiableMap(new HashMap<>());
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry :
socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(
entry.getKey(), entry.getValue(), ZMQ.PUSH
);
liveSockets.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets));
}
// Set the new content of my map as an unmodifiable map
this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSockets);
}
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter;
...
}
...
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter);
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
...
liveSockets.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
// Set the new content of my map as an unmodifiable map
this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSocketsByDatacenter);
}
你的领域liveSocketsByDatacenter
也可以是 AtomicReference<Map<Datacenters, List<SocketHolder>>>
类型, 那么它将是 final
,您的 map 仍将存储在 volatile
中变量但在类内 AtomicReference
.
之前的代码将是:
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter
= new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
...
private void connectToZMQSockets() {
...
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSockets));
}
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
...
}
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());
...
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
关于java - 从单个线程修改 HashMap 并从多个线程读取?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41952171/
前言: 有时候,一个数据库有多个帐号,包括数据库管理员,开发人员,运维支撑人员等,可能有很多帐号都有比较大的权限,例如DDL操作权限(创建,修改,删除存储过程,创建,修改,删除表等),账户多了,管理
这个问题已经有答案了: Condition variable deadlock (2 个回答) 已关闭 5 年前。 在研究多线程时,我编写了以下代码,但在屏幕上没有观察到输出。我在这里做错了什么?我期
复制代码 代码如下: <IfModule mod_rewrite.c> RewriteEngineOn RewriteBase/ #将www.zzvips.com跳转到www.zzv
复制代码 代码如下: <IfModule mod_rewrite.c> RewriteEngine On RewriteBase / # 把 www.zzvips.com
复制代码 代码如下: Const T_GATEWAY = "1.1.1.1" '网关 Const T_NEWDNS1 = "2.2.2.2" 'DNS1
0. 修改索引 大文本字段支持排序 PUT http://localhost:9200/lrc_blog/_mapping //请求体 { "properties": { "title": { "t
仅 react 当状态发生变化时重新渲染 . 那么为什么我会直接看到我对真实 DOM 所做的更改呢? 我知道我正在修改真实的 DOM,但是当我根本没有改变状态时触发重新渲染的是什么。 import R
Xcode beta 5 推出 @FetchRequest对于 SwiftUI。 我有一个 View ,它有一个 @FetchRequest . NSFetchRequest是在管理器中创建的,该管理
关闭。这个问题需要更多 focused .它目前不接受答案。 想改进这个问题?更新问题,使其仅关注一个问题 editing this post . 7年前关闭。 Improve this questi
我有一个表达式[text][id]应替换为链接 text 解决方案是( id 是整数) $s = preg_replace("/\[([^\]]+)(\]*)\]\[([0-9]+)\]/","$1$
我在 repo 中有一个文件,我不想让任何人更新。 我能做什么? 最佳答案 你想要svn锁:http://www.linxit.de/svnbook/en/1.2/svn.ref.svn.c.lock
说我有项目 list 。我想导出到csv,但在此之前我想做一些计算/修改。 基本上,设置如下所示: PS C:\Files> gci Directory: C:\Files Mode
我有一个非常简单的问题 - 是否可以修改 Java API 的源代码,例如Junit,JABX ? 我知道这似乎是一个非常愚蠢的问题,但它一直困扰着我一段时间。 最佳答案 如果您可以掌握源代码,那么请
我有一个带有变量/列的小标题,其中包括不同形状的小标题列表。我想为其中一个变量中的每个(子)标题添加一个变量/列。 例如此类数据 library("tibble") aaa aaa # A tibb
我有几个菜单,可以在单击时向当前链接添加变量。这是一个例子: 1 2 3 x y z 我的问题是,如果我选择“y”2次,它会添加“&cord=y”2次。相反,我希望它替
我有两个项目:一个服务项目和一个服务安装程序项目。服务项目具有适合我的产品的装配信息。它包括公司信息和正确的服务名称。一旦服务实际安装,所有这些似乎都会被忽略。安装服务时,它使用在服务安装程序的ini
以下代码何时可能产生副作用? @some = map { s/xxx/y/; $_ } @some; perlcritic 将其解释为危险的,因为例如: @other = map { s/xxx/y/
我想知道以下哪种解决方案更好:我想修改一些 .class 文件,我意识到有两种方法可以做到这一点: 反编译.class文件,修改它,最后再次编译。 - 直接用十六进制编辑器修改。 谢谢 最佳答案 在这
这是我的按钮代码 onclick 我希望我的程序等待用户单击一个 JPanel,并且当用户单击 JPanel 时,它应该在控制台上打印其名称。 此按钮代码未显示输出 JPopupMenu popu
我正在使用一个具有“getName()”方法的特定 API。 getName() 返回一个字符串。是否可以修改该字符串? API 中不包含修饰符方法,并且 String getName() 返回的是私
我是一名优秀的程序员,十分优秀!