gpt4 book ai didi

java - BlockingQueue 消费者在队列不为空时没有响应

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:18:18 25 4
gpt4 key购买 nike

我有一个分布式系统,它的节点通过套接字接收消息对象。当在另一个线程中接收和处理消息时,消息将写入 BlockingQueue。我确保一台机器中只有一个 BlockingQueue 实例。的传入速率非常高,大约每秒数千个。消费者起初运行良好,但在一段时间后阻塞(完全没有响应)——我检查过 BlockingQueue 不为空,因此不应被 BlockingQueue.take() 阻塞。当我手动降低传入消息对象的速率时,消费者工作得非常好。这很令人困惑......

你能帮我找出问题所在吗?非常感谢。

消费者代码:

ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(id+"-machine-worker")
.setDaemon(false)
.setPriority(Thread.MAX_PRIORITY)
.build();
ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory);
executor.submit(new Worker(machine));

public static class Worker implements Runnable {
Machine machine;
public Worker(Machine machine) {
this.machine = machine;
}
@Override
public void run() {
while (true) {
try {
Message message = machine.queue.take();
// Do my staff here...
} catch (Exception e) {
logger.error(e);
}
}
}
}

生产者代码:

// Below code submits the SocketListener runnable described below
ExecutorService worker;
Runnable runnable = socketHandlerFactory.getSocketHandlingRunnable(socket, queue);
worker.submit(runnable);

public SocketListener(Socket mySocket, Machine machine, LinkedBlockingQueue<Message> queue) {
this.id = machine.id;
this.socket = mySocket;
this.machine = machine;
this.queue = queue;

try {
BufferedInputStream bis = new BufferedInputStream(socket.getInputStream(), 8192*64);
ois = new ObjectInputStream(bis);
} catch (Exception e) {
logger.error("Error in create SocketListener", e);
}
}

@Override
public void run() {
Message message;
try {
boolean socketConnectionIsAlive = true;
while (socketConnectionIsAlive) {
if (ois != null) {
message = (Message) ois.readObject();
queue.put(message);
}
}
} catch (Exception e) {
logger.warn(e);
}
}

最佳答案

如果您使用的是无界队列,则可能会发生由于内存压力导致整个系统陷入停滞的情况。此外,这意味着生产强度不受消耗强度的限制。所以,使用有界队列。

另一个建议:当你的阻塞条件发生时,获取一个完整的线程堆栈跟踪转储,以确定消费者阻塞的地方。你可能会在那里得到惊喜。

关于java - BlockingQueue 消费者在队列不为空时没有响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16298800/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com