- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
通过stackoverflow搜索并发布这个问题,因为没有解决方案对我有用,我的问题可能与其他问题不同。
我正在编写一个脚本,它从rabbitMQ 队列中获取一篇文章并处理该文章以计算单词并从中提取关键字并将其转储到数据库中。我的脚本工作正常,但执行一段时间后,我收到此异常(-1, "ConnectionResetError(104, 'Connection reset by peer')")
我不知道为什么我会得到这个。我已经尝试了很多在 stackover 流上可用的解决方案,但没有一个对我有用。我写了我的脚本并以两种不同的方式进行了尝试。两者都工作正常,但一段时间后发生相同的异常。
这是我的第一个代码:
def app_main():
global channel, results, speedvars
Logger.log_message('Starting app main')
# Edit 4
def pika_connect():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In pika connect")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
Logger.log_message('Starting loop')
channel.start_consuming()
#########
speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()
sender = ResultsSender(results, speedvars)
sender.start()
# Edit 5 starting 10 threads to listen to pika
for th in range(qthreads):
Logger.log_message('Starting thread: '+str(th))
try:
t = Thread(target=pika_connect, args=())
t.start()
except Exception as e:
Logger.error_message("Exception in starting threads " + str(e))
try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))
def app_main():
global channel, results, speedvars
Logger.log_message('Starting app main')
speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()
sender = ResultsSender(results, speedvars)
sender.start()
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In app main")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
Logger.log_message('Starting loop')
try:
channel.start_consuming()
except Exception as e:
Logger.error_message("Exception in start_consuming in main " + str(e))
raise e
try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))
def on_message(ch, method, properties, message):
Logger.log_message("Starting parsing new msg ")
handle_message(message)
import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)
from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools
from pid.decorator import pidfile
Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None
SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'
class SpeedVars(object):
vars = {}
lock = None
def __init__(self):
self.lock = threading.Lock()
def inc(self, var):
self.lock.acquire()
try:
if var in self.vars:
self.vars[var] += 1
else:
self.vars[var] = 1
finally:
self.lock.release()
def dec(self, var):
self.lock.acquire()
try:
if var in self.vars:
self.vars[var] -= 1
else:
Logger.error_message('Cannot decrement ' + var + ', not tracked')
finally:
self.lock.release()
def get(self, var):
out = None
self.lock.acquire()
try:
if var in self.vars:
out = self.vars[var]
else:
Logger.error_message('Cannot get ' + var + ', not tracked')
finally:
self.lock.release()
return out
def get_all(self):
out = None
self.lock.acquire()
try:
out = self.vars.copy()
finally:
self.lock.release()
return out
class SpeedTracker(threading.Thread):
speedvars = None
start_ts = None
last_vars = {}
def __init__(self, speedvars):
super(SpeedTracker, self).__init__()
self.start_ts = time.time()
self.speedvars = speedvars
Logger.log_message('Setting up speed tracker')
def run(self):
while True:
time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
prev = self.last_vars
cur = self.speedvars.get_all()
now = time.time()
if len(prev) > 0:
q = {}
for key in cur:
qty = cur[key] - prev[key]
avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
overall_avg = cur[key] / (now - self.start_ts)
Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
+ ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
+ ', overall speed ' + '%0.2f' % overall_avg + '/sec')
pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
pending_avg = pending / (now - self.start_ts)
Logger.log_message('Speed-tracking (pending): total ' + str(pending)
+ ', overall speed ' + '%0.2f' % pending_avg + '/sec')
self.last_vars = cur
class ResultsSender(threading.Thread):
channel = None
results = None
speedvars = None
def __init__(self, results, speedvars):
super(ResultsSender, self).__init__()
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
self.channel = connection.channel()
Logger.log_message('Setting up output exchange')
self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
self.results = results
self.speedvars = speedvars
def run(self):
while True:
item = self.results.get()
self.channel.basic_publish(
exchange=Config.AMQ_DAEMONS['consumer']['output'],
routing_key='',
body=item)
self.speedvars.inc(SPD_SENT)
def parse_message(message):
try:
bodytxt = message.decode('UTF-8')
body = json.loads(bodytxt)
return body
except Exception as e:
Logger.error_message("Cannot parse message - " + str(e))
raise e
def get_body_elements(body):
try:
artid = str(body.get('article_id'))
article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
date = article_dt.strftime(Config.DATE_FORMAT)
article = "\n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
return (artid, date, article)
except Exception as e:
Logger.error_message("Cannot retrieve article attributes " + str(e))
raise e
def process_article(id, date, text):
global results, speedvars
try:
Logger.log_message('Processing article ' + id)
keywords = Pipeline.extract_keywords(text)
send_data = {"id": id, "date": date, "keywords": keywords}
results.put(pickle.dumps(send_data))
# print('Queue Size:',results.qsize())
except Exception as e:
Logger.error_message("Problem processing article " + str(e))
raise e
def ack_message(ch, delivery_tag):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
#pass
def handle_message(connection, ch, delivery_tag, message):
global speedvars
start = time.time()
thread_id = threading.get_ident()
try:
speedvars.inc(SPD_RECEIVED)
body = parse_message(message)
(id, date, text) = get_body_elements(body)
words = len(text.split())
if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
process_article(id, date, text)
else:
Logger.log_message('Ignoring article, over word count limit')
speedvars.inc(SPD_DISCARDED)
except Exception as e:
Logger.error_message("Could not process message - " + str(e))
cb = functools.partial(ack_message, ch, delivery_tag)
connection.add_callback_threadsafe(cb)
Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag))
Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))
# CALL BACK
## def on_message(ch, method, properties, message):
## global executor
## executor.submit(handle_message, message)
def on_message(ch, method, header_frame, message, args):
(connection, threads) = args
delivery_tag = method.delivery_tag
t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
t.start()
threads.append(t)
####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
global channel, results, speedvars
speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()
sender = ResultsSender(results, speedvars)
sender.start()
# Pika Connection
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
#channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
channel.basic_qos(prefetch_count=1)
threads = []
on_message_callback = functools.partial(on_message, args=(connection, threads))
channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])
Logger.log_message('Starting loop')
## channel.start_consuming()
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
Wait for all to complete
for thread in threads:
thread.join()
connection.close()
app_main()
最佳答案
您的 handle_message
方法正在阻止心跳,因为您的所有代码(包括 Pika I/O 循环)都在同一线程上运行。退房 this example了解如何在独立于 Pikas I/O 循环的线程上运行您的工作 (handle_message
),然后正确确认消息。
注意:RabbitMQ 团队监控 the rabbitmq-users
mailing list并且有时只回答有关 StackOverflow 的问题。
关于python-3.x - RabbitMQ Pika 连接重置 , (-1, ConnectionResetError(104, 'Connection reset by peer' )),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54003433/
每次暂存文件时,如果您需要取消暂存文件,Git 都会提供有用的说明: (use "git reset HEAD ..." to unstage) 不过体面Git Tutorials by Atlass
我需要添加几个文件以将它们组合到一个提交中,但我必须排除其中一个。在 this answer ,执行此操作的代码是: git add -u git reset -- file_to_ignore.da
这个问题在这里已经有了答案: What are typical use cases of git-reset's --merge and --keep flags? (4 个答案) 关闭 6 年前。
有时候,进行了错误的提交,但是还没有push到远程分支,想要撤销本次提交,可以使用git reset –-soft/hard命令。 1、二者区别: git reset –-soft:回退到某个版
我认为:软重置:从重置向量启动。硬复位:拉CPU的电平。 最佳答案 硬复位当然意味着整个CPU芯片及其所有外设都被复位。造成这种情况的原因可能有很多:复位引脚被外部拉高、时钟故障、片内低电压检测、看门
$python manage.py reset Unknown command: 'reset' Type 'manage.py help' for usage. 在django 1.6中是否取消了这
我是 git 的新手,所以问题可能很简单,git reset --hard HEAD 和 git reset --hard 有什么区别? 最佳答案 HEAD 在您未指定该参数时是隐含的。 但是,您可以
我使用 apollo-link-state 来本地存储错误,但清除缓存后出现以下错误。 我已在 apollo 客户端配置选项中将 errors 的默认值设置为空数组 []。 但是,在 apolloCl
我正在使用 bool 数组来存储标志(类似于“已更改”)。数组的大小是静态的,在编译时已知。 我需要定期重置数组,即将所有元素设置为 false。我应该使用常规数组和类似 memset 或 memcp
在 git 文档(和许多 SO 线程)中,推荐使用这种重置方法: $ git reset --soft HEAD^ ;# go back to WIP state $ git reset
在我的实验中,我没能发现两者之间的任何功能差异 git reset --hard 和 git reset --merge 使用说明也没有给出任何提示 --hard res
如何重置所有列过滤器?调用 reset() 似乎重置了表,但过滤器的所有字段保持不变。 最佳答案 您可以将输入值绑定(bind)到表的过滤器,如下所示: 注意 [value] 绑定(bind)。 关
使用 std::unique_ptr::reset,您可以轻松地将您的实例恢复到新状态。 C++11 之前,为了实现类似的行为,我看到很多类都定义了一个 Reset() 方法来重置其所有内部成员。但现
为了恢复工作树和索引中的更改,此答案 ( https://stackoverflow.com/a/5812972/8278160) 建议运行以下命令: git reset --hard 运行它是否与运
我目前正在测试竞技场。我本来以为这段代码会编译,但在运行时失败了,令人惊喜的是,编译器发现了这个问题。但我不知道它的推理是否正确。有人能给我解释一下吗?。错误:。来自umpalo的相关代码:
我目前正在测试竞技场。我本来以为这段代码会编译,但在运行时失败了,令人惊喜的是,编译器发现了这个问题。但我不知道它的推理是否正确。有人能给我解释一下吗?。错误:。来自umpalo的相关代码:
我正在尝试在不触发“重置”事件的情况下重置我的收藏。我已经设置了我的收藏来收听“重置”和“添加”事件 @.listenTo(@options.muses, 'add', @addOne) @.list
根据http://en.cppreference.com/w/cpp/memory/unique_ptr/reset , void reset( pointer ptr = pointer() );
我有一个别名,unstage,用于从暂存区域中删除更改。 unstage = reset -- 我注意到 git 的帮助建议改为 git reset HEAD。我还注意到 git rm --cache
这个问题在这里已经有了答案: What's the difference between HEAD^ and HEAD~ in Git? (17 个答案) 关闭 6 年前。 git reset --
我是一名优秀的程序员,十分优秀!