- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在多线程环境中使用 Jeromq,如下所示。下面是我的代码,其中 SocketManager
的构造函数首先连接到所有可用的套接字,然后我将它们放入 liveSocketsByDatacenter
map 在connectToZMQSockets
方法。之后我在同一个构造函数中启动一个后台线程,它每 30 秒运行一次,它调用 updateLiveSockets
ping所有那些已经在liveSocketsByDatacenter
中的套接字的方法映射并更新 liveSocketsByDatacenter
映射这些套接字是否还活着。
和 getNextSocket()
多个读取器线程同时调用方法以获取下一个可用的套接字,然后我们使用该套接字在其上发送数据。所以我的问题是我们在多线程环境中正确使用 Jeromq 吗?因为当我们尝试将数据发送到该实时套接字时,我们刚刚在生产环境中使用此堆栈跟踪看到异常,所以我不确定这是错误还是其他原因?
java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 30 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
getNextSocket()
SocketManager
的方法从多个读取器线程同时进行类:
// this method will be called from multiple threads
public boolean sendAsync(final long addr, final byte[] reco) {
Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
return sendAsync(addr, reco, liveSockets.get().getSocket(), false);
}
public boolean sendAsync(final long addr, final byte[] reco, final Socket socket,
final boolean messageA) {
ZMsg msg = new ZMsg();
msg.add(reco);
boolean sent = msg.send(socket);
msg.destroy();
retryHolder.put(addr, reco);
return sent;
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean sent = sendAsync(address, encodedRecords, socket, true);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// ...
return sent;
}
getNextSocket()
可以返回
0MQ socket
至
thread A
.同时,定时器线程可以访问相同的
0MQ socket
ping它。在这种情况下
thread A
和计时器线程正在发生相同的变化
0MQ socket
,这会导致问题。那么解决此问题的最佳和有效方法是什么?
ArrayIndexOutOfBoundsException
但这次它的 71 行号在
"YQueue"
文件。唯一一致的始终是 256。所以肯定有一些与 256 相关的东西,我无法弄清楚这里的 256 是什么?
java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.backPos(YQueue.java:71)
at zmq.YPipe.write(YPipe.java:51)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)
最佳答案
事实 #0:ZeroMQ 不是线程安全的——根据定义
虽然 ZeroMQ 文档和 Pieter HINTJENS 的优秀著作“Code Connected. Volume 1”并没有忘记尽可能提醒这一事实,但在线程之间返回甚至共享 ZeroMQ 套接字实例的想法时常出现。当然,类实例的方法可能会在它们的内部方法和属性中提供这种几乎“隐藏”的东西,但是适当的设计努力应该防止任何此类副作用,没有异常(exception),没有任何借口。
如果有定量事实的合理支持,共享可能是 zmq.Context()
的常见实例的一种方式。 ,但清晰的分布式系统设计可能存在于真正的多代理方案中,其中每个代理运行自己的 Context()
-引擎,根据配置和性能偏好的各自组合进行微调。
那么解决此问题的最佳和有效方法是什么?
永远不要共享一个 ZeroMQ 套接字。从来没有,确实。即使最新的发展开始 promise 在不久的将来会在这个方向上发生一些变化。用共享污染任何高性能、低延迟的分布式系统设计都是一个坏习惯。不共享是该领域的最佳设计原则。
是的,我可以看到我们不应该在线程之间共享套接字,但是在我的代码中,您认为解决此问题的最佳方法是什么?
是的,解决这个问题的最好和有效的方法是永远不要共享一个 ZeroMQ 套接字。
这意味着永远不要返回任何对象,其属性是 ZeroMQ 套接字(您主动构建并从 .connect(){...}
类方法中大量返回。在您的情况下,所有类方法似乎保留 private
,这可能融合了允许“其他线程”接触类私有(private)套接字实例的问题,但同样的原则也必须在所有属性级别上背书,以便是有效的。最后,这种“融合”走捷径,被
public static SocketManager getInstance()
,
它混杂地提供任何外部请求者来直接访问共享 ZeroMQ 套接字的类私有(private)实例。
如果某些文档几乎在每一章中都明确警告不要共享事物,那么人们宁可不要共享事物。
所以,重新设计方法,使SocketManager
获得更多功能,因为它是类方法,它将执行嵌入的必备功能,以便 如 ZeroMQ 出版物中所述,明确防止任何外部世界线程接触不可共享的实例。
接下来是资源 list :您的代码似乎每 30 秒重新检查所有感兴趣的数据中心中的世界状态。这实际上每分钟创建两次新的 List 对象。虽然你可能会投机让java垃圾收集器来整理所有 thrash,这不是从任何地方进一步引用的,对于与 ZeroMQ 相关的对象来说,这不是一个好主意,从您之前的重新检查运行中嵌入到 List-s 中。 ZeroMQ 对象仍然从 Zcontext()
内部引用- ZeroMQ Context()
-core-factory 实例化了 I/O 线程,也可以看作是 ZeroMQ 套接字库存资源管理器。所以,所有的 new
-created socket-instances 不仅获得了来自 java
的外部句柄侧面,也是内部 handle ,来自 (Z)Context()
内部.到现在为止还挺好。但是在代码中的任何地方都没有看到任何方法,这些方法会取消对象实例中的任何和所有 ZeroMQ 套接字,这些套接字已与 java
解除关联。 -side,但仍然引用自 (Z)Context()
-边。分配资源的显式资源退役是一种公平的设计方面的做法,对于有限或以其他方式受到约束的资源而言更是如此。 { "cheap"| 执行此操作的方式可能有所不同。 “昂贵的”} - 这种资源管理处理的维护成本(ZeroMQ 套接字实例作为一些轻量级的“消耗品/一次性”来处理是非常昂贵的......但那是另一回事了)。
所以,再加上一套适当的资源再利用/资源拆解方法,就可以得到new
的总量。 -创建的套接字回到您的控制之下(您的代码负责 (Z)Context()
-domain-of-resources-control 内有多少套接字处理程序可能会被创建并且必须保持被管理- - 不管是有意还是无意)。
有人可能会反对自动检测和(可能很好地延迟)垃圾收集可能有一些“ promise ”,但是,您的代码仍然负责适当的资源管理,即使 LMAX 人员也永远不会获得如此勇敢的性能,如果他们依赖来自标准 gc 的“ promise ”。您的问题比 LMAX 顶级性能必须解决的问题严重得多。您的代码(到目前为止已发布)对 没有任何作用.close()
和 .term()
ZeroMQ 相关的资源。在一个消费不受控制(分布式需求)的生态系统中,这是一种完全不可能的做法。 您必须保护您的船,以免超出您知道它可以安全处理的极限并动态卸载在“对岸”没有收件人的每个盒子。
那是船长的 (您的代码设计师的)责任。
没有明确告诉最低级别的库存管理负责人( ZeroMQ Context()
-floor )有些箱子要卸载,问题仍然是你的。标准 gc
-chain-of-command 不会“自动”执行此操作,无论“ promise ”看起来像什么,它都不会。因此,对您的 ZeroMQ 资源管理要明确,通过对这些步骤进行排序来评估返回代码,并适当处理在您的代码明确控制下执行这些资源管理操作所引发的任何和所有异常。
更低(如果不是可达到的最低值)资源利用率 - 信封和更高(如果不是最高的)性能 是正确完成这项工作的奖励。 LMAX 家伙是一个很好的例子,在这方面做得非常好,远远超出了标准的 Java“ promise ”,所以人们可以向最好的人学习。
声明的调用签名与使用的调用签名似乎不匹配:
虽然我在这一点上可能是错的,因为我的大部分设计工作都不是在 java多态调用接口(interface),签名中似乎存在不匹配,发布为:
private List<SocketHolder> connect( Datacenters dc, // 1-st
List<String> addresses, // 2-nd
int socketType // 3-rd
) {
... /* implementation */
}
connectToZMQSockets()
中调用方法只是通过:
List<SocketHolder> addedColoSockets = connect( entry.getValue(), // 1-st
ZMQ.PUSH // 2-nd
);
关于java.lang.ArrayIndexOutOfBoundsException : 256 with jeromq 0. 3.6 版本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47084567/
无法使用 Hive 版本 1.1.0 HBase 版本 0.94.8 和 hadoop 版本 2.7.0 从 hive 创建 Hbase 表 hive (default)> CREATE TABLE
我试图为 electron app 创建可执行文件但面临这个问题 Unable to determine Electron version. Please specify an Electron ve
我正在尝试让自适应阈值在 python 绑定(bind)到 opencv 中工作(swig 一个 - 无法让 opencv 2.0 工作,因为我正在使用 beagleboard 因为交叉编译还没有工作
我一直在 linux 机器上使用 JMeter,在命令行下使用了一段时间。工作正常。 今天,我在 Windows 机器(新客户端等)上尝试了它,它确实可以工作,但在控制台窗口中输出有很大不同。 Lin
在我的编码环境中,我通常使用最新版本的 Java 和 Eclipse。当我编写源代码时,我不会注意我使用的 API 方法或类是否向后兼容旧版本的 Java 或 Eclipse。在 javadoc 中存
问题是关于版本的特定组合,但更普遍。 我刚刚从 Kubuntu 12.04 升级到 14.04。现在,当我想编译 CUDA 代码(使用 CUDA 6.5)时,我得到: #error -- unsupp
我目前正在对我的一些应用程序进行沙箱处理,看来我必须删除一些功能才能满足 Mac App Store 沙箱(和其他)规则。 显然用户不会因为失去功能而感到高兴,我担心他们不会指责苹果制定了愚蠢的规则,
我用 flash 和 js 版本创建了一个动画横幅。 是否可以检测低于版本 9 的 ie 版本,然后提供 Flash 横幅,否则提供 js 横幅。 最佳答案 您可以使用条件注释来检测 IE 版本
我有一个处理不同位置的数据库的应用程序,我想检查这些数据库是否使用 Firebird 2.5 或更高版本打开。我们最近从 Firebird 2.0 迁移到了 2.5,我们有很多数据库可以响应 sele
我正在开发一个应用程序,我使用托管在我的服务器上的 Java 和 Jersey 构建了后端部分。我在服务器上使用 Tomcat7 来调用 Web 服务。 我以前有一台安装了 Ubuntu 的计算机,我
我可以使用 GetVersionEx() 函数来获取 Windows 版本,但是这个函数将返回一个数字而不是一个字符串。但是没有问题,因为我可以将数字转换为字符串,例如: if (osvi.dwMaj
我已经在我的系统中安装了 Anaconda 2 & 3。 Anaconda 2 包含 python 2.7 & Anaconda 3 包含 python 3.6。 我需要使用命令提示符运行我的 pyt
我正在尝试构建一个 Android 项目,但发生了以下错误 Error:(10, 1) A problem occurred evaluating project ':app'. > Failed t
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 4 年前。 Improve this qu
在降级我的 GCC 之前,我想知道是否有办法确定我的机器中的哪些程序/框架或依赖项会中断,以及是否有更好的方法来执行 openpose 安装? (例如,在 CMake 中更改某些内容) 有没有办法在不
我已经在终端的代码sudo apt-get install Shadowsocks-qt5中安装了Shadowsocks-Qt5,然后我可以通过搜索找到启动图标,但是它当我点击图标时打不开。然后我尝试
在网络上找到的文档说,MLLP V2(第 2 版)是用于传输 HL7 版本 3 内容的所有消息传输协议(protocol)的要求。似乎 MLLP 第 2 版主要用于 HL7 第 3 版。 我们可以/应
我正在使用带有 selinium webdriver 的 Protractor 。我的chromeDriver版本是78.0.1,chrome版本是78.0.3904.97。两个版本都匹配,应该不会有
我正在按照教程设置 mysql 数据库并做一些事情。我无法找到数据库资源管理器。我读了很多,但在 Window->show View-> Dataxxx 或右侧上部选项卡中无法正常工作。 最佳答案 从
我已经在 KDE 桌面上安装了 Anaconda 2.0.1。当我运行 python 并看到所有已安装的模块时,我收到此消息“无法将不兼容的 Qt 库(版本 0x40801)与该库(版本 0x4080
我是一名优秀的程序员,十分优秀!