- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在多线程环境中使用 Jeromq,如下所示。下面是我的代码,其中 SocketManager
的构造函数首先连接到所有可用的套接字,然后我将它们放入 liveSocketsByDatacenter
map 在connectToZMQSockets
方法。之后我在同一个构造函数中启动一个后台线程,它每 30 秒运行一次,它调用 updateLiveSockets
ping所有那些已经在liveSocketsByDatacenter
中的套接字的方法映射并更新 liveSocketsByDatacenter
映射这些套接字是否还活着。
和 getNextSocket()
多个读取器线程同时调用方法以获取下一个可用的套接字,然后我们使用该套接字在其上发送数据。所以我的问题是我们在多线程环境中正确使用 Jeromq 吗?因为当我们尝试将数据发送到该实时套接字时,我们刚刚在生产环境中使用此堆栈跟踪看到异常,所以我不确定这是错误还是其他原因?
java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 30 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
getNextSocket()
SocketManager
的方法从多个读取器线程同时进行类:
// this method will be called from multiple threads
public boolean sendAsync(final long addr, final byte[] reco) {
Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
return sendAsync(addr, reco, liveSockets.get().getSocket(), false);
}
public boolean sendAsync(final long addr, final byte[] reco, final Socket socket,
final boolean messageA) {
ZMsg msg = new ZMsg();
msg.add(reco);
boolean sent = msg.send(socket);
msg.destroy();
retryHolder.put(addr, reco);
return sent;
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean sent = sendAsync(address, encodedRecords, socket, true);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// ...
return sent;
}
getNextSocket()
可以返回
0MQ socket
至
thread A
.同时,定时器线程可以访问相同的
0MQ socket
ping它。在这种情况下
thread A
和计时器线程正在发生相同的变化
0MQ socket
,这会导致问题。那么解决此问题的最佳和有效方法是什么?
ArrayIndexOutOfBoundsException
但这次它的 71 行号在
"YQueue"
文件。唯一一致的始终是 256。所以肯定有一些与 256 相关的东西,我无法弄清楚这里的 256 是什么?
java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.backPos(YQueue.java:71)
at zmq.YPipe.write(YPipe.java:51)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)
最佳答案
事实 #0:ZeroMQ 不是线程安全的——根据定义
虽然 ZeroMQ 文档和 Pieter HINTJENS 的优秀著作“Code Connected. Volume 1”并没有忘记尽可能提醒这一事实,但在线程之间返回甚至共享 ZeroMQ 套接字实例的想法时常出现。当然,类实例的方法可能会在它们的内部方法和属性中提供这种几乎“隐藏”的东西,但是适当的设计努力应该防止任何此类副作用,没有异常(exception),没有任何借口。
如果有定量事实的合理支持,共享可能是 zmq.Context()
的常见实例的一种方式。 ,但清晰的分布式系统设计可能存在于真正的多代理方案中,其中每个代理运行自己的 Context()
-引擎,根据配置和性能偏好的各自组合进行微调。
那么解决此问题的最佳和有效方法是什么?
永远不要共享一个 ZeroMQ 套接字。从来没有,确实。即使最新的发展开始 promise 在不久的将来会在这个方向上发生一些变化。用共享污染任何高性能、低延迟的分布式系统设计都是一个坏习惯。不共享是该领域的最佳设计原则。
是的,我可以看到我们不应该在线程之间共享套接字,但是在我的代码中,您认为解决此问题的最佳方法是什么?
是的,解决这个问题的最好和有效的方法是永远不要共享一个 ZeroMQ 套接字。
这意味着永远不要返回任何对象,其属性是 ZeroMQ 套接字(您主动构建并从 .connect(){...}
类方法中大量返回。在您的情况下,所有类方法似乎保留 private
,这可能融合了允许“其他线程”接触类私有(private)套接字实例的问题,但同样的原则也必须在所有属性级别上背书,以便是有效的。最后,这种“融合”走捷径,被
public static SocketManager getInstance()
,
它混杂地提供任何外部请求者来直接访问共享 ZeroMQ 套接字的类私有(private)实例。
如果某些文档几乎在每一章中都明确警告不要共享事物,那么人们宁可不要共享事物。
所以,重新设计方法,使SocketManager
获得更多功能,因为它是类方法,它将执行嵌入的必备功能,以便 如 ZeroMQ 出版物中所述,明确防止任何外部世界线程接触不可共享的实例。
接下来是资源 list :您的代码似乎每 30 秒重新检查所有感兴趣的数据中心中的世界状态。这实际上每分钟创建两次新的 List 对象。虽然你可能会投机让java垃圾收集器来整理所有 thrash,这不是从任何地方进一步引用的,对于与 ZeroMQ 相关的对象来说,这不是一个好主意,从您之前的重新检查运行中嵌入到 List-s 中。 ZeroMQ 对象仍然从 Zcontext()
内部引用- ZeroMQ Context()
-core-factory 实例化了 I/O 线程,也可以看作是 ZeroMQ 套接字库存资源管理器。所以,所有的 new
-created socket-instances 不仅获得了来自 java
的外部句柄侧面,也是内部 handle ,来自 (Z)Context()
内部.到现在为止还挺好。但是在代码中的任何地方都没有看到任何方法,这些方法会取消对象实例中的任何和所有 ZeroMQ 套接字,这些套接字已与 java
解除关联。 -side,但仍然引用自 (Z)Context()
-边。分配资源的显式资源退役是一种公平的设计方面的做法,对于有限或以其他方式受到约束的资源而言更是如此。 { "cheap"| 执行此操作的方式可能有所不同。 “昂贵的”} - 这种资源管理处理的维护成本(ZeroMQ 套接字实例作为一些轻量级的“消耗品/一次性”来处理是非常昂贵的......但那是另一回事了)。
所以,再加上一套适当的资源再利用/资源拆解方法,就可以得到new
的总量。 -创建的套接字回到您的控制之下(您的代码负责 (Z)Context()
-domain-of-resources-control 内有多少套接字处理程序可能会被创建并且必须保持被管理- - 不管是有意还是无意)。
有人可能会反对自动检测和(可能很好地延迟)垃圾收集可能有一些“ promise ”,但是,您的代码仍然负责适当的资源管理,即使 LMAX 人员也永远不会获得如此勇敢的性能,如果他们依赖来自标准 gc 的“ promise ”。您的问题比 LMAX 顶级性能必须解决的问题严重得多。您的代码(到目前为止已发布)对 没有任何作用.close()
和 .term()
ZeroMQ 相关的资源。在一个消费不受控制(分布式需求)的生态系统中,这是一种完全不可能的做法。 您必须保护您的船,以免超出您知道它可以安全处理的极限并动态卸载在“对岸”没有收件人的每个盒子。
那是船长的 (您的代码设计师的)责任。
没有明确告诉最低级别的库存管理负责人( ZeroMQ Context()
-floor )有些箱子要卸载,问题仍然是你的。标准 gc
-chain-of-command 不会“自动”执行此操作,无论“ promise ”看起来像什么,它都不会。因此,对您的 ZeroMQ 资源管理要明确,通过对这些步骤进行排序来评估返回代码,并适当处理在您的代码明确控制下执行这些资源管理操作所引发的任何和所有异常。
更低(如果不是可达到的最低值)资源利用率 - 信封和更高(如果不是最高的)性能 是正确完成这项工作的奖励。 LMAX 家伙是一个很好的例子,在这方面做得非常好,远远超出了标准的 Java“ promise ”,所以人们可以向最好的人学习。
声明的调用签名与使用的调用签名似乎不匹配:
虽然我在这一点上可能是错的,因为我的大部分设计工作都不是在 java多态调用接口(interface),签名中似乎存在不匹配,发布为:
private List<SocketHolder> connect( Datacenters dc, // 1-st
List<String> addresses, // 2-nd
int socketType // 3-rd
) {
... /* implementation */
}
connectToZMQSockets()
中调用方法只是通过:
List<SocketHolder> addedColoSockets = connect( entry.getValue(), // 1-st
ZMQ.PUSH // 2-nd
);
关于java.lang.ArrayIndexOutOfBoundsException : 256 with jeromq 0. 3.6 版本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47084567/
我正在编写一个具有以下签名的 Java 方法。 void Logger(Method method, Object[] args); 如果一个方法(例如 ABC() )调用此方法 Logger,它应该
我是 Java 新手。 我的问题是我的 Java 程序找不到我试图用作的图像文件一个 JButton。 (目前这段代码什么也没做,因为我只是得到了想要的外观第一的)。这是我的主课 代码: packag
好的,今天我在接受采访,我已经编写 Java 代码多年了。采访中说“Java 垃圾收集是一个棘手的问题,我有几个 friend 一直在努力弄清楚。你在这方面做得怎么样?”。她是想骗我吗?还是我的一生都
我的 friend 给了我一个谜语让我解开。它是这样的: There are 100 people. Each one of them, in his turn, does the following
如果我将使用 Java 5 代码的应用程序编译成字节码,生成的 .class 文件是否能够在 Java 1.4 下运行? 如果后者可以工作并且我正在尝试在我的 Java 1.4 应用程序中使用 Jav
有关于why Java doesn't support unsigned types的问题以及一些关于处理无符号类型的问题。我做了一些搜索,似乎 Scala 也不支持无符号数据类型。限制是Java和S
我只是想知道在一个 java 版本中生成的字节码是否可以在其他 java 版本上运行 最佳答案 通常,字节码无需修改即可在 较新 版本的 Java 上运行。它不会在旧版本上运行,除非您使用特殊参数 (
我有一个关于在命令提示符下执行 java 程序的基本问题。 在某些机器上我们需要指定 -cp 。 (类路径)同时执行java程序 (test为java文件名与.class文件存在于同一目录下) jav
我已经阅读 StackOverflow 有一段时间了,现在我才鼓起勇气提出问题。我今年 20 岁,目前在我的家乡(罗马尼亚克卢日-纳波卡)就读 IT 大学。足以介绍:D。 基本上,我有一家提供簿记应用
我有 public JSONObject parseXML(String xml) { JSONObject jsonObject = XML.toJSONObject(xml); r
我已经在 Java 中实现了带有动态类型的简单解释语言。不幸的是我遇到了以下问题。测试时如下代码: def main() { def ks = Map[[1, 2]].keySet()
一直提示输入 1 到 10 的数字 - 结果应将 st、rd、th 和 nd 添加到数字中。编写一个程序,提示用户输入 1 到 10 之间的任意整数,然后以序数形式显示该整数并附加后缀。 public
我有这个 DownloadFile.java 并按预期下载该文件: import java.io.*; import java.net.URL; public class DownloadFile {
我想在 GUI 上添加延迟。我放置了 2 个 for 循环,然后重新绘制了一个标签,但这 2 个 for 循环一个接一个地执行,并且标签被重新绘制到最后一个。 我能做什么? for(int i=0;
我正在对对象 Student 的列表项进行一些测试,但是我更喜欢在 java 类对象中创建硬编码列表,然后从那里提取数据,而不是连接到数据库并在结果集中选择记录。然而,自从我这样做以来已经很长时间了,
我知道对象创建分为三个部分: 声明 实例化 初始化 classA{} classB extends classA{} classA obj = new classB(1,1); 实例化 它必须使用
我有兴趣使用 GPRS 构建车辆跟踪系统。但是,我有一些问题要问以前做过此操作的人: GPRS 是最好的技术吗?人们意识到任何问题吗? 我计划使用 Java/Java EE - 有更好的技术吗? 如果
我可以通过递归方法反转数组,例如:数组={1,2,3,4,5} 数组结果={5,4,3,2,1}但我的结果是相同的数组,我不知道为什么,请帮助我。 public class Recursion { p
有这样的标准方式吗? 包括 Java源代码-测试代码- Ant 或 Maven联合单元持续集成(可能是巡航控制)ClearCase 版本控制工具部署到应用服务器 最后我希望有一个自动构建和集成环境。
我什至不知道这是否可能,我非常怀疑它是否可能,但如果可以,您能告诉我怎么做吗?我只是想知道如何从打印机打印一些文本。 有什么想法吗? 最佳答案 这里有更简单的事情。 import javax.swin
我是一名优秀的程序员,十分优秀!