gpt4 book ai didi

java - rabbitmq AlreadyClosedException

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:56:21 24 4
gpt4 key购买 nike

我正在将 rabbitmq 服务器与 java 应用程序一起使用。当应用程序在特定队列上接收到消息时,它会生成一些数据并将它们发送到另一个队列。

运行应用程序时,它会接收消息、生成数据并将它们发送到正确的队列中。数据在服务器上很好地接收并且它们是正确的。但是当应用程序尝试向服务器发送确认时,我收到了一个 AlreadyClosedException。

我在服务器日志中有以下消息:正在关闭 AMQP 连接。

这是 rabbitMQ 消费者类中 handleDelivery 函数的代码:

public void handleDelivery( String consumerTag, Envelope envelope, asicProperties properties, byte[] body )
throws IOException {
actionManager.receivedSelectedData( body );
getChannel().basicAck( envelope.getDeliveryTag(), false );
}

这是 receivedSelectedData() 方法中的代码,其中数据在发送前生成:

public void receivedSelectedData( byte[] body ) {
differentialEquations = differentialEquationsObjectManager.fromBytes( body );
timeSeriesCollection.removeAllSeries();
for ( int i = 0; i < differentialEquations.size(); i++ ) {
differentialEquation = differentialEquations.get( i );
for ( int j = 0; j < differentialEquation.getSystems().size(); j++ ) {
try {
generator = new NumericalMethod( differentialEquation, j );
} catch ( Exception e ) {
e.printStackTrace();
}
try {
timeSeriesCollection.addSeries( generator.equationToTimeseriesRK4( 10.0 ) );
} catch ( Exception e ) {
e.printStackTrace();
}
}
}
sender.publish( RabbitMQGenerateSender.GENERATE_DATA_QUEUE_NAME,
timeSeriesCollectionObjectMnanager.toBytes( timeSeriesCollection ) );
}

队列似乎已正确声明,这是我的队列声明:

protected void declareQueue( String queueName ) {
try {
channel.queueDeclare( queueName, true, false, false, null );
} catch ( IOException e ) {
e.printStackTrace();
}
}

和 channel 声明:

try {
connection = factory.newConnection();
channel = connection.createChannel();
int prefetchCount = 1;
channel.basicQos( prefetchCount );
} catch ( IOException | TimeoutException e ) {
e.printStackTrace();
}

我有一些其他应用程序使用具有相同 channel 和队列声明参数的 rabbitmq,它们运行良好。我只有一个应用程序在发送确认时系统性地失败。

这是 getChannel() 方法:

public Channel getChannel() {
return channel;
}

最佳答案

如果要支持 ACK 功能,则必须将接收队列声明为 AutoDelete = false。这是 C# 中的示例(与 Java 中可能存在细微差别)

private bool PushDataToQueue(byte[] data, string queueName, ref string error)
{
try
{
if (_connection == null || !_connection.IsOpen)
_connection = _factory.CreateConnection();

using (IModel channel = _connection.CreateModel())
{
if (AutoCloseConnection)
_connection.AutoClose = AutoCloseConnection;
// Set the AutoDelete as false, fourth parameter!!!
channel.QueueDeclare(queueName, true, false, false, null);
channel.BasicPublish("", queueName, null, data);
if (!channel.IsClosed)
channel.Close();
}
}
catch (Exception e)
{
error = string.Format("Failed pushing data to queue name '{0}' on host '{1}' with user '{2}'. Error: {3}", queueName, _factory.HostName, _factory.UserName,
e.GetCompleteDetails());
return false;
}

return true;
}

关于java - rabbitmq AlreadyClosedException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34284029/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com