gpt4 book ai didi

rabbitmq - 服务器空间不足时 basicPublish 超时

转载 作者:行者123 更新时间:2023-12-01 10:46:03 25 4
gpt4 key购买 nike

我的情况是 rabbitmq 服务器空间不足,如下所示

Filesystem                       1K-blocks    Used Available Use% Mounted on
/dev/mapper/ramonubuntu--vg-root 6299376 5956336 0 100% /

生产者向服务器发布消息(消息需要持久化),然后会一直阻塞,一直等待发布的响应。当然应该避免服务器空间不足的情况,但是有没有超时机制让producer退出等待呢?

我已经尝试过 heartbeat 和 SO_TIMEOUT,它们都不起作用,因为网络工作正常。下面是我的制作人。

 protected void publish(byte[] message) throws Exception {
// ConnectionFactory can be reused between threads.
ConnectionFactory factory = new SoTimeoutConnectionFactory();
factory.setHost(this.getHost());
factory.setVirtualHost("te");
factory.setPort(5672);
factory.setUsername("amqp");
factory.setPassword("amqp");
factory.setConnectionTimeout(10 * 1000);
// doesn't help if server got out of space
factory.setRequestedHeartbeat(1);
final Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// declare a 'topic' type of exchange
channel.exchangeDeclare(this.exchangeName, "topic", true);

channel.addReturnListener(new ReturnListener() {

@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
logger.warn("[X]Returned message(replyCode:" + replyCode + ",replyText:" + replyText
+ ",exchange:" + exchange + ",routingKey:" + routingKey + ",body:" + new String(body));
}

});

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {

@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
logger.info("Ack: " + deliveryTag);
// RabbitMessagePublishMain.this.release(connection);
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
logger.info("Nack: " + deliveryTag);
// RabbitMessagePublishMain.this.release(connection);
}

});

channel.basicPublish(this.exchangeName, RabbitMessageConsumerMain.EXCHANGE_NAME + ".-1", true,
MessageProperties.PERSISTENT_BASIC, message);
channel.waitForConfirmsOrDie(10*1000);
// now we can close connection
connection.close();
}

它将阻塞在“channel.waitForConfirmsOrDie(10*1000);”和 SotimeoutConnectionFactory,

public class SoTimeoutConnectionFactory extends ConnectionFactory {

@Override
protected void configureSocket(Socket socket) throws IOException {
super.configureSocket(socket);
socket.setSoTimeout(10 * 1000);
}
}

我还捕获了生产者和 rabbimq 之间的网络, enter image description here

请帮忙。

最佳答案

您需要实现 Connection Block/Unblocked .

这基本上是一种通知发布者服务器资源不足的方式。这样做的好处是,一旦可以安全地再次发布,发布者也会收到通知。

我建议您看看这个 article .实现这一点的一个简单方法是设置一个标志,指示发布是否安全,如果不安全,请等到发布。

作为示例,您可以看看我是如何实现 this 的在我的一个 Python 示例中。

关于rabbitmq - 服务器空间不足时 basicPublish 超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25882074/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com