gpt4 book ai didi

python-3.x - RabbitMQ Pika 连接重置 , (-1, ConnectionResetError(104, 'Connection reset by peer' ))

转载 作者:行者123 更新时间:2023-12-04 01:49:30 27 4
gpt4 key购买 nike

通过stackoverflow搜索并发布这个问题,因为没有解决方案对我有用,我的问题可能与其他问题不同。

我正在编写一个脚本,它从rabbitMQ 队列中获取一篇文章并处理该文章以计算单词并从中提取关键字并将其转储到数据库中。我的脚本工作正常,但执行一段时间后,我收到此异常(-1, "ConnectionResetError(104, 'Connection reset by peer')")
我不知道为什么我会得到这个。我已经尝试了很多在 stackover 流上可用的解决方案,但没有一个对我有用。我写了我的脚本并以两种不同的方式进行了尝试。两者都工作正常,但一段时间后发生相同的异常。

这是我的第一个代码:

def app_main():

global channel, results, speedvars
Logger.log_message('Starting app main')

# Edit 4
def pika_connect():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In pika connect")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

Logger.log_message('Starting loop')
channel.start_consuming()

#########

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()


# Edit 5 starting 10 threads to listen to pika

for th in range(qthreads):
Logger.log_message('Starting thread: '+str(th))
try:
t = Thread(target=pika_connect, args=())
t.start()
except Exception as e:
Logger.error_message("Exception in starting threads " + str(e))



try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))

这是我的第二个代码:
def app_main():

global channel, results, speedvars
Logger.log_message('Starting app main')

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()

connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In app main")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

Logger.log_message('Starting loop')

try:
channel.start_consuming()
except Exception as e:
Logger.error_message("Exception in start_consuming in main " + str(e))
raise e


try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))

在我的第一个代码中,我使用了线程,因为我想加快处理文章的过程。

这是我的回调函数 def on_message(ch, method, properties, message):
Logger.log_message("Starting parsing new msg ")
handle_message(message)

编辑:完整代码
import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)

from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools

from pid.decorator import pidfile

Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None

SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'

class SpeedVars(object):
vars = {}
lock = None

def __init__(self):
self.lock = threading.Lock()

def inc(self, var):

self.lock.acquire()
try:
if var in self.vars:
self.vars[var] += 1
else:
self.vars[var] = 1
finally:
self.lock.release()


def dec(self, var):

self.lock.acquire()
try:
if var in self.vars:
self.vars[var] -= 1
else:
Logger.error_message('Cannot decrement ' + var + ', not tracked')
finally:
self.lock.release()

def get(self, var):

out = None
self.lock.acquire()
try:
if var in self.vars:
out = self.vars[var]
else:
Logger.error_message('Cannot get ' + var + ', not tracked')
finally:
self.lock.release()


return out

def get_all(self):

out = None
self.lock.acquire()
try:
out = self.vars.copy()
finally:
self.lock.release()


return out


class SpeedTracker(threading.Thread):
speedvars = None
start_ts = None
last_vars = {}

def __init__(self, speedvars):
super(SpeedTracker, self).__init__()
self.start_ts = time.time()
self.speedvars = speedvars
Logger.log_message('Setting up speed tracker')

def run(self):
while True:
time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
prev = self.last_vars
cur = self.speedvars.get_all()
now = time.time()
if len(prev) > 0:
q = {}
for key in cur:
qty = cur[key] - prev[key]
avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
overall_avg = cur[key] / (now - self.start_ts)
Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
+ ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
+ ', overall speed ' + '%0.2f' % overall_avg + '/sec')
pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
pending_avg = pending / (now - self.start_ts)
Logger.log_message('Speed-tracking (pending): total ' + str(pending)
+ ', overall speed ' + '%0.2f' % pending_avg + '/sec')
self.last_vars = cur


class ResultsSender(threading.Thread):
channel = None
results = None
speedvars = None

def __init__(self, results, speedvars):
super(ResultsSender, self).__init__()
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
self.channel = connection.channel()
Logger.log_message('Setting up output exchange')
self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
self.results = results
self.speedvars = speedvars

def run(self):
while True:
item = self.results.get()
self.channel.basic_publish(
exchange=Config.AMQ_DAEMONS['consumer']['output'],
routing_key='',
body=item)
self.speedvars.inc(SPD_SENT)

def parse_message(message):
try:
bodytxt = message.decode('UTF-8')
body = json.loads(bodytxt)
return body
except Exception as e:
Logger.error_message("Cannot parse message - " + str(e))
raise e

def get_body_elements(body):
try:
artid = str(body.get('article_id'))
article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
date = article_dt.strftime(Config.DATE_FORMAT)
article = "\n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
return (artid, date, article)
except Exception as e:
Logger.error_message("Cannot retrieve article attributes " + str(e))
raise e

def process_article(id, date, text):
global results, speedvars
try:
Logger.log_message('Processing article ' + id)
keywords = Pipeline.extract_keywords(text)
send_data = {"id": id, "date": date, "keywords": keywords}
results.put(pickle.dumps(send_data))
# print('Queue Size:',results.qsize())
except Exception as e:
Logger.error_message("Problem processing article " + str(e))
raise e

def ack_message(ch, delivery_tag):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
#pass

def handle_message(connection, ch, delivery_tag, message):
global speedvars
start = time.time()
thread_id = threading.get_ident()

try:
speedvars.inc(SPD_RECEIVED)
body = parse_message(message)
(id, date, text) = get_body_elements(body)
words = len(text.split())
if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
process_article(id, date, text)
else:
Logger.log_message('Ignoring article, over word count limit')
speedvars.inc(SPD_DISCARDED)

except Exception as e:
Logger.error_message("Could not process message - " + str(e))

cb = functools.partial(ack_message, ch, delivery_tag)
connection.add_callback_threadsafe(cb)

Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag))
Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))

# CALL BACK
## def on_message(ch, method, properties, message):
## global executor
## executor.submit(handle_message, message)

def on_message(ch, method, header_frame, message, args):
(connection, threads) = args
delivery_tag = method.delivery_tag
t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
t.start()
threads.append(t)


####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
global channel, results, speedvars

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()


# Pika Connection
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()

Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)

#channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
channel.basic_qos(prefetch_count=1)
threads = []
on_message_callback = functools.partial(on_message, args=(connection, threads))
channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])

Logger.log_message('Starting loop')
## channel.start_consuming()
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

Wait for all to complete
for thread in threads:
thread.join()

connection.close()


app_main()

鼠兔并没有花很多时间来处理消息,但我仍然面临连接重置问题。
**处理消息所用的总时间:0.0005991458892822266
**

最佳答案

您的 handle_message方法正在阻止心跳,因为您的所有代码(包括 Pika I/O 循环)都在同一线程上运行。退房 this example了解如何在独立于 Pikas I/O 循环的线程上运行您的工作 (handle_message),然后正确确认消息。

注意:RabbitMQ 团队监控 the rabbitmq-users mailing list并且有时只回答有关 StackOverflow 的问题。

关于python-3.x - RabbitMQ Pika 连接重置 , (-1, ConnectionResetError(104, 'Connection reset by peer' )),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54003433/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com