gpt4 book ai didi

php - Rabbitmq - php amqp broken broken pipe 错误

转载 作者:可可西里 更新时间:2023-11-01 00:02:58 30 4
gpt4 key购买 nike

我正在处理一个巨大的 xml 文档(其中包含大约一百万个条目),然后使用 rabbitmq 将格式化版本导入数据库。每次发布大约 200,000 个条目后,我都会收到一个 broken pipe 错误,并且 rabbitmq 无法从中恢复。

Notice Error: fwrite(): send of 2651 bytes failed with errno=11 Resource temporarily unavailable in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

Notice Error: fwrite(): send of 33 bytes failed with errno=104 Connection reset by peer in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

Notice Error: fwrite(): send of 19 bytes failed with errno=32 Broken pipe in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

这随后会导致节点关闭错误,需要手动终止进程才能从中恢复。

这些是我的类方法:-

public function publishMessage($message) {
if (!isset($this->conn)) {
$this->_createNewConnectionAndChannel();
}
try {
$this->ch->basic_publish(
new AMQPMessage($message, array('content_type' => 'text/plain')),
$this->defaults['exchange']['name'],
$this->defaults['binding']['routing_key']
);
} catch (Exception $e) {
echo "Caught exception : " . $e->getMessage();
echo "Creating new connection.";
$this->_createNewConnectionAndChannel();
$this->publishMessage($message); // try again
}
}

protected function _createNewConnectionAndChannel() {
if (isset($this->conn)) {
$this->conn->close();
}

if(isset($this->ch)) {
$this->ch->close();
}

$this->conn = new AMQPConnection(
$this->defaults['connection']['host'],
$this->defaults['connection']['port'],
$this->defaults['connection']['user'],
$this->defaults['connection']['pass']
);
$this->ch = $this->conn->channel();
$this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true);
$this->ch->basic_qos(0 , 20 , 0); // fair dispatching

$this->ch->queue_declare(
$this->defaults['queue']['name'],
$this->defaults['queue']['passive'],
$this->defaults['queue']['durable'],
$this->defaults['queue']['exclusive'],
$this->defaults['queue']['auto_delete']
);

$this->ch->exchange_declare(
$this->defaults['exchange']['name'],
$this->defaults['exchange']['type'],
$this->defaults['exchange']['passive'],
$this->defaults['exchange']['durable'],
$this->defaults['exchange']['auto_delete']
);

$this->ch->queue_bind(
$this->defaults['queue']['name'],
$this->defaults['exchange']['name'],
$this->defaults['binding']['routing_key']
);
}

我们将不胜感激。

最佳答案

确保您已在 Rabbit MQ 上为您的用户添加了虚拟主机访问权限。我创建了新用户并忘记了为默认使用的“/”主机设置访问权限。

您可以通过管理面板 yourhost:15672 > Admin > 点击用户 > 寻找“设置权限”。

附言我假设您的 RabbitMQ 服务正在运行,用户存在且密码正确。

关于php - Rabbitmq - php amqp broken broken pipe 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15842460/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com