gpt4 book ai didi

python - 使用来自 Celery 的 SQS 消息

转载 作者:行者123 更新时间:2023-12-01 14:02:53 40 4
gpt4 key购买 nike

我的 Python 应用程序中有一个 Celery 实现。我使用的经纪商是 SQS。发送到 SQS 的消息来自不同的应用程序,通过 Boto3 的 send_message() api。现在我的困惑是如何触发 Celery 从 SQS 中挑选消息进行处理。将有一些任务将在 Celery 中运行,它应该正确处理来自 SQS 的消息。我的要求类似于 Celery Consumer SQS Messages .

据我了解,Celery 轮询 SQS 直到消息到达那里。有人可以帮我解决这个问题吗?

最佳答案

我每 20 秒调用一次此任务:

@app.task(name='listen_to_sqs_telemetry')
def listen_to_sqs_telemetry():
logger.info('start listen_to_telemetry')
sqs = get_sqs_client()
queue_url = 'https://sqs.us-east-2.amazonaws.com/xxx'
logger.info('Using ' + queue_url)

keep_going = True
num = 0
while keep_going:
keep_going = False
try:
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=[
'SentTimestamp',
],
MaxNumberOfMessages=5,
MessageAttributeNames=[
'All'
],
WaitTimeSeconds=20
)
# logger.info(response)
if 'Messages' in response:
keep_going = True
for rec in response['Messages']:
# Process message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=rec['ReceiptHandle']
)
num = num + 1
else:
pass
# logger.info(response)
except Exception as e:
logger.error(str(e))
logger.info('done with listen_to_sqs_telemetry')
return "Processed {} message(s)".format(num)

关于python - 使用来自 Celery 的 SQS 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55456337/

40 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com