gpt4 book ai didi

python - MySQL 在 celery 任务期间不断失去连接

转载 作者:行者123 更新时间:2023-11-29 22:21:53 25 4
gpt4 key购买 nike

我正在尝试尽快处理整个 csv 文件,因此我希望将每一行作为 celery 任务并行处理。清理也是一个 celery 任务,必须等到每一行都处理完毕。请参阅下面的示例。

问题是,我似乎无法访问文件,因为我一直遇到 MySQL 连接错误。到目前为止,我已经看到这两个错误:2013,“查询期间失去与 MySQL 服务器的连接”2006,“MySQL 服务器已消失”

from app.db.meta import Session
from celery import chord, Celery
from celery.signals import task_postrun

celery = Celery()
celery.config_from_object('config')

@task_postrun.connect
def close_session(*args, **kwargs):
Session.remove()

def main():
# process each line in parallel
header = [process_line.s(line) for line in csv_file]
# pass stats to cleanup after all lines are processed
callback = cleanup.s()
chord(header)(callback)

@celery.task
def process_line(line):
session = Session()
...
# process line
...
return stats

@celery.task
def cleanup(stats):
session = Session()
...
# do cleanup and log stats
...

我正在使用 celery 3.1.18 和 SQLAlchemy 0.9.9。我也在使用连接池。

mysql> SHOW FULL PROCESSLIST;                                                                  
+----+------+-----------+-----------------+---------+------+-------+-----------------------+
| Id | User | Host | db | Command | Time | State | Info |
+----+------+-----------+-----------------+---------+------+-------+-----------------------+
| 1 | root | localhost | ab__development | Sleep | 4987 | | NULL |
| 11 | root | localhost | ab__development | Sleep | 1936 | | NULL |
| 16 | root | localhost | ab__development | Sleep | 143 | | NULL |
| 17 | root | localhost | ab__development | Sleep | 1045 | | NULL |
| 18 | root | localhost | NULL | Query | 0 | init | SHOW FULL PROCESSLIST |
| 21 | root | localhost | ab__development | Sleep | 7 | | NULL |
+----+------+-----------+-----------------+---------+------+-------+-----------------------+
6 rows in set (0.01 sec)

最佳答案

Read the answer 。简而言之,您必须禁用 SQLAlchemy's Pool engine或者尝试 ping mysql 服务器:

from flask.ext.sqlalchemy import SQLAlchemy
from sqlalchemy import event, exc


def instance(app):
""":rtype: SQLAlchemy"""
db = SQLAlchemy(app)

if app.testing:
return db

@event.listens_for(db.engine, 'checkout')
def checkout(dbapi_con, con_record, con_proxy):
try:
try:
dbapi_con.ping(False)
except TypeError:
app.logger.debug('MySQL connection died. Restoring...')
dbapi_con.ping()
except dbapi_con.OperationalError as e:
app.logger.warning(e)
if e.args[0] in (2006, 2013, 2014, 2045, 2055):
raise exc.DisconnectionError()
else:
raise

return db

关于python - MySQL 在 celery 任务期间不断失去连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30630120/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com