gpt4 book ai didi

python - Python Redis和celery客户端过多,每次执行时都会出现不同的错误|任务使用pymsql连接到MySQL

转载 作者:行者123 更新时间:2023-12-01 01:26:13 31 4
gpt4 key购买 nike

我目前正在开发一个应用程序,该应用程序必须处理一些长期运行的任务。
我正在使用python 3flaskceleryredis

我在localhost上有一个可行的解决方案,但是在heroku上有很多错误,每次执行该应用程序都会触发一组不同的错误。我知道它不可能是随机的,所以我想弄清楚从哪里开始寻找。

我觉得redis一定有问题,我试图了解客户是什么以及客户来自何处,但是我找不到有关此主题的官方文档或解释。

题:

如果redis服务器已启动(即使在本地主机上),则连接了许多客户端,尽管我什么也没做。在heroku(我正在使用heroku-redis)上,我始终有6个客户端,而localhost是11个客户端。

我已经进行了一些研究,并能够通过以下方式进行展示:

if 'DYNO' in os.environ:
redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
else:
redis_db = redis.StrictRedis()

# see what keys are in Redis
all_keys = redis_db.keys()
print (all_keys)

all_clients = redis_db.client_list()
print (all_clients)


我看到了所有这些客户,但是那里的信息对我没有任何帮助。这些是什么?他们为什么在那里?他们来自哪里?

所有的heroku redis插件都有一个客户端限制,因此我需要了解并优化此限制。首先,我想到了 clientsnumber == tasknumber,但事实并非如此。

总共我定义了12个任务,但是现在我正在测试2个任务(两个任务都在30秒以内完成)。

当我在本地主机上执行任务时,客户端将从11增加到16。如果我再次从16执行到18,并且此后它们始终保持为18,则无论我执行任务的频率如何都无关紧要。

那么这是怎么回事?我有2个任务,为什么客户从11增加到16,然后又从16增加到18?为什么在任务完成后不关闭它们?

我为整个问题苦苦挣扎了几天(尽管它在localhost上总是可以正常工作),因此欢迎任何帮助或想法。我需要开始寻找某个地方,因此目前我正在尝试了解客户。

编辑:

我安装了flower并尝试监视localhost上的2个任务,一切看起来都很好。它处理两个任务,并在几秒钟内都成功。返回值是正确的(但在localhost上总是可以正常工作)。

问题仍然是,在我开始开花之后,客户数量猛增到30。我仍然不知道:客户是什么?随着我生成的客户端数量的增加,我将需要一个100美元的附加组件来处理两个任务,这需要几秒钟才能完成,这不能成立,我仍然认为redis有点问题,即使在本地主机上也是如此。

我的redis设置非常简单:

if 'DYNO' in os.environ:
app.config['CELERY_BROKER_URL'] = 'redis://[the full URL from the redis add-on]'
app.config['CELERY_RESULT_BACKEND'] = 'redis://[the full URL from the redis add-on]'
else:
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])


这是一个任务示例:

@celery.task(bind=True)
def get_users_deregistrations_task(self, g_start_date, g_end_date):

start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
end_date = datetime.strptime(g_end_date, '%d-%m-%Y')

a1 = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
a2 = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
a3 = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()

if a1 is None:
a1 = 0

if a2 is None:
a2 = 0

if a3 is None:
a3 = 0

amount = a1 + a2 + a3

return {'some_value' : amount}

# Selects user deregistrations between selected dates
@app.route('/get-users-deregistration', methods=["POST"])
@basic_auth.required
@check_verified
def get_users_deregistrations():
if request.method == "POST":

# init task
task = get_users_deregistrations_task.apply_async([session['g_start_date'], session['g_end_date']])
return json.dumps({}), 202, {'Location': url_for('taskstatus_get_users_deregistrations', task_id=task.id)}

@app.route('/status/<task_id>')
def taskstatus_get_users_deregistrations(task_id):
task = get_users_deregistrations_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info['current'],
'total': task.info['total'],
'status': 'Finished',
'statistic': task.info['statistic'],
'final_dataset': task.info
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
print ('in else')
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # this is the exception raised
}
return json.dumps(response)


编辑:

这是我的heroku proc文件:

web: gunicorn stats_main:app
worker: celery worker -A stats_main.celery --loglevel=info


编辑

我认为问题可能是连接池(在redis端),我没有正确使用它。

我还找到了一些芹菜配置,并添加了它们:

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'], redis_max_connections=20, BROKER_TRANSPORT_OPTIONS = {
'max_connections': 20,
}, broker_pool_limit=None)


通过这些配置,我再次将所有内容上传到heroku。我仍然只测试2个任务,两个任务都很快速。

我已经在heroku上连续执行了10次任务,执行了7次。好像他们完成得太早了3次:返回的结果是错误的(正确的结果是30000,返回了3次18000)。

客户端迅速跳到20,但从未超过20,因此至少可以解决最大客户端错误和与Redis错误的连接丢失。

现在的大问题是任务可能会过早完成,非常重要的是返回的结果正确无误,性能一点也不重要。

编辑

没关系,什么都没有解决,一切似乎都是随机的。
我在其中一项任务中添加了两个 print()以进一步调试,并将其上传到heroku。执行2次后,我再次看到与Redis的连接丢失,达到了最大客户端数(但我的redismonitor插件显示客户端从未超过20个)

编辑

大量的客户端可能是由空闲的客户端引起的,由于某些原因,这些客户端从未关闭(在 heroku的博客文章中找到):


  默认情况下,Redis永远不会关闭空闲连接,这意味着
  如果您未明确关闭Redis连接,则将锁定
  自己脱离您的实例。
  
  为了确保不会发生这种情况,Heroku Redis设置了默认连接
  超时300秒。此超时不适用于
  非发布/订阅客户端以及其他阻止操作。


我现在在执行每一项任务之前为空闲的客户端添加了一个kill函数:

def kill_idle_clients():
if 'DYNO' in os.environ:
redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
else:
redis_db = redis.StrictRedis()

all_clients = redis_db.client_list()
counter = 0
for client in all_clients:
if int(client['idle']) >= 15:
redis_db.client_kill(client['addr'])
counter += 1

print ('killing idle clients:', counter)


在任务启动之前,它将关闭所有空闲时间超过15秒的客户端。它可以再次在localhost上运行(但不足为奇,它始终可以在localhost上运行)。我的客户较少,但是在heroku上,它只工作10次的2倍。8次的任务又过早地完成了。也许空闲的客户端并不是真正的空闲,我不知道。

由于每次执行任务都会产生不同的结果(失去与redis的连接,达到客户端限制,过早完成,运行完美),这几乎也是无法测试的。

编辑

芹菜设置似乎一直被忽略。我一直对此感到怀疑,因此决定通过添加一些随机参数并将值更改为无意义来对其进行测试。我重启了芹菜工人办公室。

我原本希望看到一些错误,但是却没有任何反应。

在这些无意义的配置下,一切都像以前一样工作:

celery = Celery(app.name, broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'], redis_max_connections='pups', BROKER_TRANSPORT_OPTIONS = {
'max_connections': 20,
}, broker_pool_limit=None, broker_connection_timeout='pups', pups="pups")
celery.conf.broker_transport_options = {'visibility_timeout': 'pups'}


编辑

我更改了加载芹菜配置的方式(来自单独的配置文件)。似乎现在可以工作,但是问题仍然相同。

celery_task = Celery(broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'])
celery_task.config_from_object('celeryconfig')


编辑

通过这些配置,我设法将所有任务的本地主机上的客户端数量限制为18(我尝试了全部12个任务)。但是在heroku上,它“以某种方式”起作用。客户数量较少,但一次达到20个,尽管我认为我不能超过18个。(我在heroku上测试了4个任务)。

使用所有12个任务在heroku上进行测试都会触发许多不同的SQL错误。我现在比以前更加困惑。似乎同一任务已执行多次,但我只能看到12个任务URL。

我认为这是因为SQL错误是f.e .:

sqlalchemy.exc.InternalError: (pymysql.err.InternalError) Packet sequence number wrong - got 117 expected 1


要么

sqlalchemy.exc.InterfaceError: (pymysql.err.InterfaceError) (0, '')


要么

Multiple rows were found for one()


我在heroku上用4个任务测试了几次,有时会返回任务结果,但是结果非常奇怪。

这次任务还没有完成得太早,但是返回了增加的值,看起来任务A已经返回了2次并求和。

示例:任务A必须返回10k,但是它返回20k,因此任务已执行两次,并且已对结果求和。

这是我当前的配置。我仍然不完全理解数学,但是我认为它(针对客户数量)为100%:

max-conncurency * CELERYD_MAX_TASKS_PER_CHILD


在localhost上,我找到了一个新的CLI命令来检查工作程序统计信息,并且我有 max-conncurecy=3CELERYD_MAX_TASKS_PER_CHILD=6

CLI命令:

celery -A stats_main.celery_task inspect stats


我当前的配置:

工人开始:

celery worker -A stats_main.celery_task --loglevel=info --autoscale=10,3


配置:

CELERY_REDIS_MAX_CONNECTIONS=20
BROKER_POOL_LIMIT=None
CELERYD_WORKER_LOST_WAIT=20
CELERYD_MAX_TASKS_PER_CHILD=6
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} # 5 hours
CELERY_RESULT_DB_SHORT_LIVED_SESSIONS = True #useful if: For example, intermittent errors like (OperationalError) (2006, ‘MySQL server has gone away’)


编辑

看到所有这些SQL错误之后,我决定研究一个完全不同的方向。我的新理论是,这可能是一个 MySQL问题。

我按照 this question的答案中所述调整了与MySQL服务器的连接。

我还发现pymsql具有 threadsafety=1,我还不知道这是否可能是一个问题,但是MySQL似乎与连接和连接池有关。

此刻,我还可以说内存不是问题,因为如果软件包太大,它就不能在localhost上运行,这意味着我将 max_allowed_packet保留为默认值,约为4MB。

我还创建了3个虚拟任务,这些任务无需连接到外部MySQL DB即可进行一些简单的计算。我现在已经在heroku上执行了5次,并且没有错误,结果始终是正确的,所以我认为问题不是celery,redis,而是MySQL,尽管我不知道为什么它可以在本地主机上工作。可能是所有这三个因素的组合,这导致了Heroku上的问题。

编辑

我调整了JS文件。现在,每个任务都一个接一个地调用,这意味着它们并不异步(我仍然使用celery的 apply_async,因为 apply无效)

因此,这是一个困难的解决方法。我只是为每个任务创建了一个 var,例如 var task_1_rdy = false;

我还创建了一个函数,该函数每2秒运行一次,并检查一个任务是否准备就绪,如果准备就绪,它将启动下一个任务。我认为我在这里所做的事情很容易理解。

在heroku上进行了测试,即使有多个任务,也没有任何错误,因此也许可以解决该问题。我需要进行更多测试,但看起来很有希望。 Ofc。我没有使用异步功能,在任务运行之后可能会表现最差,但是嘿,它现在可以工作了。我将对性能差异进行基准测试,并在星期一更新问题。

编辑

我今天做了很多测试。完成任务所需的时间是相同的(同步与异步),我不知道为什么,但是它是相同的。

在heroku上处理所有12个任务并选择一个巨大的时间范围(巨大的时间范围=任务需要更长的时间,因为要处理的数据更多):

同样,任务结果不精确,返回的值是错误的,只是略有错误,但是错误并且因此不可靠,例如任务A必须返回20k,在heroku上必须返回19500。我不知道数据丢失的可能性/任务返回得太早,但是2周后,我将放弃并尝试使用完全不同的系统。

最佳答案

听起来像您使用celery worker redis作为味精队列的rest-api。
这是chk列表:

您的客户端中的1逻辑完成后是否关闭了连接

2芹菜会新来的工人,这些工人可能会造成麻烦,请尝试用花监控芹菜

3确保您的客户完成任务,尝试调试并打印一些内容,有时会暂存并且本地出现网络问题,这使您无法完成芹菜任务

4如果您将redis用于芹菜味精队列,请尝试监视队列数,也许它们会自动扩大?

关于python - Python Redis和celery客户端过多,每次执行时都会出现不同的错误|任务使用pymsql连接到MySQL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53321145/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com