gpt4 book ai didi

flask - Celery + SQS两次接收相同的任务,同时具有相同的任务ID

转载 作者:行者123 更新时间:2023-12-03 16:25:26 25 4
gpt4 key购买 nike

在 flask 应用程序中将 celery 与SQS一起使用
但是 celery 在同一时间两次接收到具有相同任务ID的相同任务
像这样的 worker celery worker -A app.jobs.run -l info --pidfile=/var/run/celery/celery.pid --logfile=/var/log/celery/celery.log --time-limit=7200 --concurrency=8这是 celery 的原木

[2019-11-29 08:07:35,464: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,465: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]
[2019-11-29 08:07:35,471: WARNING/ForkPoolWorker-4] in booking funtion1
[2019-11-29 08:07:35,473: WARNING/ForkPoolWorker-3] in booking funtion1
[2019-11-29 08:07:35,537: WARNING/ForkPoolWorker-3] book_request_pp
[2019-11-29 08:07:35,543: WARNING/ForkPoolWorker-4] book_request_pp
两次收到相同的任务,并且两个任务同时运行,
使用pyhton flask 中的SQS使用celery == 4.4.0rc4,boto3 == 1.9.232,kombu == 4.6.6。
在SQS中,默认可见性超时为30分钟,我的任务是没有ETA且没有确认
我的task.py
from app import app as flask_app
from app.jobs.run import capp
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(flask_app)

class BookingTasks:
def addBookingToTask(self):
request_data = request.json
print ('in addBookingToTask',request_data['request_id'])
print (request_data)
bookFlightTask.delay(request_data)
return 'addBookingToTask added'

@capp.task(max_retries=0)
def bookFlightTask(request_data):
task_id = capp.current_task.request.id
try:
print ('in booking funtion1')
----
我的配置文件config.py
import os
from urllib.parse import quote_plus

aws_access_key = quote_plus(os.getenv('AWS_ACCESS_KEY'))
aws_secret_key = quote_plus(os.getenv('AWS_SECRET_KEY'))

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
imports = ('app.jobs.run',)


## Using the database to store task state and results.
result_backend = 'db' + '+' + os.getenv('SQLALCHEMY_DATABASE_URI')
最后是我的 celery 应用程序文件run.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from flask import Flask
from app import app as flask_app
import sqlalchemy
capp = Celery()

capp.config_from_object('app.jobs.config')

# Optional configuration, see the capplication user guide.
capp.conf.update(
result_expires=3600,
)

# SQS_QUEUE_NAME is like 'celery_test.fifo' , .fifo is required
capp.conf.task_default_queue = os.getenv('FLIGHT_BOOKINNG_SQS_QUEUE_NAME')
if __name__ == '__main__':
capp.start()

最佳答案

默认的SQS visiblity_timeout为30秒。您需要更新celery的配置值:broker_transport_options={'visibility_timeout': 3600}

当 celery 去创建队列时,它将可见性超时设置为1h。

注意:如果您指定task_default_queue,并且已经在未指定broker_transport_options={'visibility_timeout': 3600}的情况下创建了队列,那么在使用broker_transport_options={'visibility_timeout': 3600}重新启动时,celery将不会更新可见性超时。您将需要删除队列,并让 celery 重新创建它。

关于flask - Celery + SQS两次接收相同的任务,同时具有相同的任务ID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59123536/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com