gpt4 book ai didi

java.lang.ArrayIndexOutOfBoundsException : 256 with jeromq 0. 3.6 版本

转载 作者:行者123 更新时间:2023-12-02 03:52:31 25 4
gpt4 key购买 nike

我在多线程环境中使用 Jeromq,如下所示。下面是我的代码,其中 SocketManager 的构造函数首先连接到所有可用的套接字,然后我将它们放入 liveSocketsByDatacenter map 在connectToZMQSockets方法。之后我在同一个构造函数中启动一个后台线程,它每 30 秒运行一次,它调用 updateLiveSockets ping所有那些已经在liveSocketsByDatacenter中的套接字的方法映射并更新 liveSocketsByDatacenter映射这些套接字是否还活着。

getNextSocket()多个读取器线程同时调用方法以获取下一个可用的套接字,然后我们使用该套接字在其上发送数据。所以我的问题是我们在多线程环境中正确使用 Jeromq 吗?因为当我们尝试将数据发送到该实时套接字时,我们刚刚在生产环境中使用此堆栈跟踪看到异常,所以我不确定这是错误还是其他原因?

java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)

下面是我的代码:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();

private static class Holder {
private static final SocketManager instance = new SocketManager();
}

public static SocketManager getInstance() {
return Holder.instance;
}

private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}

// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}

private List<SocketHolder> connect(List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);

SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}

// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}

private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}

// runs every 30 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);

// pinging to see whether a socket is live or not
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;

SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}

这是我如何使用 getNextSocket() SocketManager的方法从多个读取器线程同时进行类:
// this method will be called from multiple threads
public boolean sendAsync(final long addr, final byte[] reco) {
Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
return sendAsync(addr, reco, liveSockets.get().getSocket(), false);
}

public boolean sendAsync(final long addr, final byte[] reco, final Socket socket,
final boolean messageA) {
ZMsg msg = new ZMsg();
msg.add(reco);
boolean sent = msg.send(socket);
msg.destroy();
retryHolder.put(addr, reco);
return sent;
}

public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean sent = sendAsync(address, encodedRecords, socket, true);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// ...
return sent;
}

我不认为这是正确的,我相信。好像 getNextSocket()可以返回 0MQ socketthread A .同时,定时器线程可以访问相同的 0MQ socket ping它。在这种情况下 thread A和计时器线程正在发生相同的变化 0MQ socket ,这会导致问题。那么解决此问题的最佳和有效方法是什么?

注: SocketHolder 是一个不可变的类

更新:

我刚刚注意到我的另一个盒子上发生了同样的问题 ArrayIndexOutOfBoundsException但这次它的 71 行号在 "YQueue"文件。唯一一致的始终是 256。所以肯定有一些与 256 相关的东西,我无法弄清楚这里的 256 是什么?
java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.backPos(YQueue.java:71)
at zmq.YPipe.write(YPipe.java:51)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)

最佳答案

事实 #0:ZeroMQ 不是线程安全的——根据定义

虽然 ZeroMQ 文档和 Pieter HINTJENS 的优秀著作“Code Connected. Volume 1”并没有忘记尽可能提醒这一事实,但在线程之间返回甚至共享 ZeroMQ 套接字实例的想法时常出现。当然,类实例的方法可能会在它们的内部方法和属性中提供这种几乎“隐藏”的东西,但是适当的设计努力应该防止任何此类副作用,没有异常(exception),没有任何借口。

如果有定量事实的合理支持,共享可能是 zmq.Context() 的常见实例的一种方式。 ,但清晰的分布式系统设计可能存在于真正的多代理方案中,其中每个代理运行自己的 Context() -引擎,根据配置和性能偏好的各自组合进行微调。

那么解决此问题的最佳和有效方法是什么?

永远不要共享一个 ZeroMQ 套接字。从来没有,确实。即使最新的发展开始 promise 在不久的将来会在这个方向上发生一些变化。用共享污染任何高性能、低延迟的分布式系统设计都是一个坏习惯。不共享是该领域的最佳设计原则。

是的,我可以看到我们不应该在线程之间共享套接字,但是在我的代码中,您认为解决此问题的最佳方法是什么?

是的,解决这个问题的最好和有效的方法是永远不要共享一个 ZeroMQ 套接字。

这意味着永远不要返回任何对象,其属性是 ZeroMQ 套接字(您主动构建并从 .connect(){...} 类方法中大量返回。在您的情况下,所有类方法似乎保留 private ,这可能融合了允许“其他线程”接触类私有(private)套接字实例的问题,但同样的原则也必须在所有属性级别上背书,以便是有效的。最后,这种“融合”走捷径,被
public static SocketManager getInstance() ,
它混杂地提供任何外部请求者来直接访问共享 ZeroMQ 套接字的类私有(private)实例。

如果某些文档几乎在每一章中都明确警告不要共享事物,那么人们宁可不要共享事物。

所以,重新设计方法,使SocketManager获得更多功能,因为它是类方法,它将执行嵌入的必备功能,以便 如 ZeroMQ 出版物中所述,明确防止任何外部世界线程接触不可共享的实例。

接下来是资源 list :您的代码似乎每 30 秒重新检查所有感兴趣的数据中心中的世界状态。这实际上每分钟创建两次新的 List 对象。虽然你可能会投机让垃圾收集器来整理所有 thrash,这不是从任何地方进一步引用的,对于与 ZeroMQ 相关的对象来说,这不是一个好主意,从您之前的重新检查运行中嵌入到 List-s 中。 ZeroMQ 对象仍然从 Zcontext() 内部引用- ZeroMQ Context() -core-factory 实例化了 I/O 线程,也可以看作是 ZeroMQ 套接字库存资源管理器。所以,所有的 new -created socket-instances 不仅获得了来自 java 的外部句柄侧面,也是内部 handle ,来自 (Z)Context() 内部.到现在为止还挺好。但是在代码中的任何地方都没有看到任何方法,这些方法会取消对象实例中的任何和所有 ZeroMQ 套接字,这些套接字已与 java 解除关联。 -side,但仍然引用自 (Z)Context() -边。分配资源的显式资源退役是一种公平的设计方面的做法,对于有限或以其他方式受到约束的资源而言更是如此。 { "cheap"| 执行此操作的方式可能有所不同。 “昂贵的”} - 这种资源管理处理的维护成本(ZeroMQ 套接字实例作为一些轻量级的“消耗品/一次性”来处理是非常昂贵的......但那是另一回事了)。

所以,再加上一套适当的资源再利用/资源拆解方法,就可以得到new的总量。 -创建的套接字回到您的控制之下(您的代码负责 (Z)Context() -domain-of-resources-control 内有多少套接字处理程序可能会被创建并且必须保持被管理- - 不管是有意还是无意)。

有人可能会反对自动检测和(可能很好地延迟)垃圾收集可能有一些“ promise ”,但是,您的代码仍然负责适当的资源管理,即使 LMAX 人员也永远不会获得如此勇敢的性能,如果他们依赖来自标准 gc 的“ promise ”。您的问题比 LMAX 顶级性能必须解决的问题严重得多。您的代码(到目前为止已发布)对 没有任何作用.close() .term() ZeroMQ 相关的资源。在一个消费不受控制(分布式需求)的生态系统中,这是一种完全不可能的做法。 您必须保护您的船,以免超出您知道它可以安全处理的极限并动态卸载在“对岸”没有收件人的每个盒子。

那是船长的 (您的代码设计师的)责任。

没有明确告诉最低级别的库存管理负责人( ZeroMQ Context() -floor )有些箱子要卸载,问题仍然是你的。标准 gc -chain-of-command 不会“自动”执行此操作,无论“ promise ”看起来像什么,它都不会。因此,对您的 ZeroMQ 资源管理要明确,通过对这些步骤进行排序来评估返回代码,并适当处理在您的代码明确控制下执行这些资源管理操作所引发的任何和所有异常。

更低(如果不是可达到的最低值)资源利用率 - 信封和更高(如果不是最高的)性能 是正确完成这项工作的奖励。 LMAX 家伙是一个很好的例子,在这方面做得非常好,远远超出了标准的 Java“ promise ”,所以人们可以向最好的人学习。

声明的调用签名与使用的调用签名似乎不匹配:
虽然我在这一点上可能是错的,因为我的大部分设计工作都不是在 多态调用接口(interface),签名中似乎存在不匹配,发布为:

private List<SocketHolder> connect( Datacenters  dc,                     // 1-st
List<String> addresses, // 2-nd
int socketType // 3-rd
) {
... /* implementation */
}

和实际的方法调用,在 connectToZMQSockets() 中调用方法只是通过:
        List<SocketHolder> addedColoSockets = connect( entry.getValue(), // 1-st
ZMQ.PUSH // 2-nd
);

关于java.lang.ArrayIndexOutOfBoundsException : 256 with jeromq 0. 3.6 版本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47084567/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com