gpt4 book ai didi

python - Pika 无明显原因崩溃

转载 作者:行者123 更新时间:2023-12-01 04:56:17 32 4
gpt4 key购买 nike

我有一个问题。我的鼠兔客户端永久崩溃并显示错误消息。

这就是发生的事情:

  1. RabbitMQ 正在运行,生产者已将消息推送到队列中
  2. 我启动 python 脚本,它会处理队列中缓冲的所有包
  3. 我的脚本定期抛出异常:ConnectionClosed,但我从未在任何地方关闭任何内容

这是我的代码:

import pika
import traceback

class RPCServer(object):


def __init__(self, callback, cfg):
self.cfg = cfg
self.callback = callback
self.credentials = None
self.parameters = None
self.connection = None
self.channel = None
self.counter = 0
self.initalize_me()


def initalize_me(self):
self.credentials = pika.PlainCredentials(self.cfg.USER, self.cfg.PASSWORD)
self.parameters = pika.ConnectionParameters(host=self.cfg.AMQP_HOST, credentials=self.credentials)
self.connection = pika.BlockingConnection(self.parameters)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.cfg.RPC_EXCHANGE_NAME, type="direct")
self.channel.queue_declare(queue=self.cfg.RPC_QUEUE_NAME)
self.channel.queue_bind(exchange=self.cfg.RPC_EXCHANGE_NAME, queue=self.cfg.RPC_QUEUE_NAME, routing_key=self.cfg.RPC_ROUTING_KEY)
self.channel.basic_consume(self.rpc_callback, queue=self.cfg.RPC_QUEUE_NAME)
print "init= " + str(self.cfg.RPC_EXCHANGE_NAME) + " -> " + str(self.cfg.RPC_QUEUE_NAME) + " -> " + str(self.cfg.RPC_ROUTING_KEY)


def start_rpc_server(self):
print "Server: Start listening for RPC requests..."
try:
self.channel.start_consuming()
except:
print "Exception: " + str(traceback.format_exc())
self.initalize_me()
self.start_rpc_server()


def rpc_callback(self, ch, method, props, body):
self.counter += 1
if self.counter == 100:
print "100 package processed..."
self.counter = 0
result = self.callback(body)
properties = pika.BasicProperties(correlation_id=props.correlation_id)
ch.basic_publish(exchange="", routing_key=props.reply_to, properties=properties, body=result)
ch.basic_ack(delivery_tag=method.delivery_tag)

这是我运行它时的输出:

python RunPacer.py 
Initialize Configuration
Start Pacer
100 package processed...
100 package processed...
init= pacing_exchange_debug -> pacing_queue_debug -> pacing_routing_key_debug
Server: Start listening for RPC requests...
Exception: Traceback (most recent call last):
File "/home/Tom/Pacer/amqp/RPCServer.py", line 46, in start_rpc_server
self.channel.start_consuming()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 955, in start_consuming
self.connection.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 243, in process_data_events
raise exceptions.ConnectionClosed()
ConnectionClosed

100 package processed...
100 package processed...
init= pacing_exchange_debug -> pacing_queue_debug -> pacing_routing_key_debug
Server: Start listening for RPC requests...
Exception: Traceback (most recent call last):
File "/home/Tom/Pacer/amqp/RPCServer.py", line 46, in start_rpc_server
self.channel.start_consuming()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 955, in start_consuming
self.connection.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 243, in process_data_events
raise exceptions.ConnectionClosed()
ConnectionClosed

init= pacing_exchange_debug -> pacing_queue_debug -> pacing_routing_key_debug
Server: Start listening for RPC requests...
100 package processed...
100 package processed...
Exception: Traceback (most recent call last):
File "/home/Tom/Pacer/amqp/RPCServer.py", line 46, in start_rpc_server
self.channel.start_consuming()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 955, in start_consuming
self.connection.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 243, in process_data_events
raise exceptions.ConnectionClosed()
ConnectionClosed

抱歉,我的描述非常不准确,但是,这是因为我完全不知道为什么我的脚本崩溃。所以任何,任何建议都会有帮助。谢谢!

编辑:添加了rabbitmq的错误日志:

=INFO REPORT==== 4-Dec-2014::12:55:42 ===
accepting AMQP connection <0.8947.0> (183.13.20.123:61598 -> 183.13.20.123:5672)

=ERROR REPORT==== 4-Dec-2014::12:55:42 ===
Error on AMQP connection <0.8947.0> (183.13.20.123:61598 -> 183.13.20.123:5672, vhost: '/', user: 'username', state: running), channel 1:
{amqp_error,unexpected_frame,
"expected content body, got non content body frame instead",
'basic.publish'}

=INFO REPORT==== 4-Dec-2014::12:55:43 ===
closing AMQP connection <0.8947.0> (183.13.20.123:61598 -> 183.13.20.123:5672)

另外:我有一个在同一个队列上工作的Java程序(实际上是java中python脚本的副本),它运行没有任何问题。

最佳答案

这显然是 Pika 中的一个错误(参见此处:https://github.com/pika/pika/issues/349)。由于这个问题已经一年多没有得到解决(错误报告 12/2013 这篇文章:12/2014)我不会再依赖 pika 了。

但是,一个很棒(且快速)的替代方案(这会导致代码更少)是 librabbitmq: https://pypi.python.org/pypi/librabbitmq

以下是使用 librabbitmq 实现 RPC 调用的示例代码:

# -*- coding: utf-8 -*-

from librabbitmq import Connection
import uuid

class RPCClient(object):

def __init__(self, cfg):
self.cfg = cfg
self.connection = Connection(host=cfg.AMQP_HOST, userid=cfg.USER, password=cfg.PASSWORD)
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.queue
self.response = None
self.corr_id = None
self.channel.basic_consume(self.callback_queue, callback=self.process_response)


def process_response(self, msg):
if self.corr_id == msg.properties['correlation_id']:
self.response = str(msg.body)


def rpc_call(self, msg):
self.response = None
self.corr_id = str(uuid.uuid4())
props = {'reply_to' : str(self.callback_queue), 'correlation_id' : str(self.corr_id)}

self.channel.basic_publish(msg, exchange=self.cfg.RPC_EXCHANGE_NAME,
routing_key=self.cfg.RPC_ROUTING_KEY, **props)
while self.response is None:
self.connection.drain_events()
return str(self.response)

以及对应的RPCServer:

# -*- coding: utf-8 -*-

from librabbitmq import Connection

class RPCServer(object):

def __init__(self, callback, cfg):
self.cfg = cfg
self.callback = callback
self.connection = Connection(host=cfg.AMQP_HOST, userid=cfg.USER, password=cfg.PASSWORD)
self.channel = self.connection.channel()
self.channel.exchange_declare(cfg.RPC_EXCHANGE_NAME, "direct")
self.channel.queue_declare(cfg.RPC_QUEUE_NAME)
self.channel.queue_bind(cfg.RPC_QUEUE_NAME, cfg.RPC_EXCHANGE_NAME, cfg.RPC_ROUTING_KEY)
self.channel.basic_consume(cfg.RPC_QUEUE_NAME, callback=self.rpc_callback)


def start_rpc_server(self):
while True: self.connection.drain_events()


def rpc_callback(self, msg):
resp = self.callback(msg.body)
self.channel.basic_publish(resp, exchange="", routing_key=msg.properties['reply_to'], **msg.properties)
self.channel.basic_ack(msg.delivery_info['delivery_tag'])

关于python - Pika 无明显原因崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27294346/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com