- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我拥有的:
我使用rabbitMQ作为代理运行celery,并使用redis作为结果后端。我有一个发送任务的应用程序和处理任务的工作人员。
我的部署如下:
首先,如果您对此架构有建议,我将很高兴获得建议。
问题:
一切正常,任务被分配给不同的工作人员,这些工作人员会发回结果等...
当任务在 30 分钟后发送且没有任务运行时,当任务未发送到“local_worker”时,我观察到 Redis 延迟为 2 分钟。
这就像redis或VM在发送回结果之前休眠了2分钟。因为它正好是 120 秒(2 分钟),所以我希望它是 redis、celery 或 azure 想要的东西(确定性的东西)
我不使用redis conf文件,仅使用默认设置(密码除外)来运行redis服务器。
感谢您对我的架构和问题的帮助和反馈。
这是我在花中看到的屏幕截图。这三个任务是相同的(删除目录)。
第一个和第三个任务已由本地工作人员处理。第二个已由外部工作人员处理。在外部工作人员的日志上,我在返回结果之前放置了一条打印行,该行已在 14:14:23 打印。因此,此打印与任务正式结束之间相隔 120 秒。
编辑:
我发现redis_socket_timeout的默认值为120秒。
我删除了行 redis_retry_on_timeout = True
并在我的 celery 配置文件中添加了行 redis_socket_keepalive = True
。现在我得到的错误是任务失败,并出现 redis.exceptions.TimeoutError: Timeout Reading from socket
。我不知道为什么结果已准备好,套接字却超时。是不是我的容器实例的网络有问题?
这是我的 docker-compose:
version: "3.5"
services:
rabbitmq:
image: rabbitmq:3.8-management
restart: always
ports:
- 5672:5672
labels:
- traefik.enable=true
- traefik.http.services.rabbitmq-ui.loadbalancer.server.port=15672
- traefik.http.routers.rabbitmq-ui-http.entrypoints=http
- traefik.http.routers.rabbitmq-ui-http.rule=(Host(`rabbitmq.${HOSTNAME?Variable not set}.example.app`))
- traefik.docker.network=traefik-public
- traefik.http.routers.rabbitmq-ui-https.entrypoints=https
- traefik.http.routers.rabbitmq-ui-https.rule=Host(`rabbitmq.${HOSTNAME?Variable not set}.example.app`)
- traefik.http.routers.rabbitmq-ui-https.tls=true
- traefik.http.routers.rabbitmq-ui-https.tls.certresolver=le
- traefik.http.routers.rabbitmq-ui-http.middlewares=https-redirect
env_file:
- .env
environment:
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
networks:
- traefik-public
redis:
image: redis:6.2.5
restart: always
command: ["redis-server", "--requirepass", "${RABBITMQ_DEFAULT_PASS:-password}"]
ports:
- 6379:6379
networks:
- traefik-public
flower:
image: mher/flower:0.9.5
restart: always
labels:
- traefik.enable=true
- traefik.http.services.flower-ui.loadbalancer.server.port=5555
- traefik.http.routers.flower-ui-http.entrypoints=http
- traefik.http.routers.flower-ui-http.rule=Host(`flower.${HOSTNAME?Variable not set}.example.app`)
- traefik.docker.network=traefik-public
- traefik.http.routers.flower-ui-https.entrypoints=https
- traefik.http.routers.flower-ui-https.rule=Host(`flower.${HOSTNAME?Variable not set}.example.app`)
- traefik.http.routers.flower-ui-https.tls=true
- traefik.http.routers.flower-ui-https.tls.certresolver=le
- traefik.http.routers.flower-ui-http.middlewares=https-redirect
- traefik.http.routers.flower-ui-https.middlewares=traefik-admin-auth
env_file:
- .env
command:
- "--broker=amqp://${RABBITMQ_DEFAULT_USER:-guest}:${RABBITMQ_DEFAULT_PASS:-guest}@rabbitmq:5672//"
depends_on:
- rabbitmq
- redis
networks:
- traefik-public
local_worker:
build:
context: ..
dockerfile: ./setup/devops/docker/app.dockerfile
image: swtools:app
restart: always
volumes:
- ${SWTOOLSWORKINGDIR:-/tmp}:${SWTOOLSWORKINGDIR:-/tmp}
command: ["celery", "--app=app.worker.celery_app:celery_app", "worker", "-n", "local_worker@%h"]
env_file:
- .env
environment:
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
- RABBITMQ_HOST=rabbitmq
- REDIS_HOST=${HOSTNAME?Variable not set}
depends_on:
- rabbitmq
- redis
networks:
- traefik-public
dashboard_app:
image: swtools:app
restart: always
labels:
- traefik.enable=true
- traefik.http.services.dash-app.loadbalancer.server.port=${DASH_PORT-8080}
- traefik.http.routers.dash-app-http.entrypoints=http
- traefik.http.routers.dash-app-http.rule=Host(`dashboard.${HOSTNAME?Variable not set}.example.app`)
- traefik.docker.network=traefik-public
- traefik.http.routers.dash-app-https.entrypoints=https
- traefik.http.routers.dash-app-https.rule=Host(`dashboard.${HOSTNAME?Variable not set}.example.app`)
- traefik.http.routers.dash-app-https.tls=true
- traefik.http.routers.dash-app-https.tls.certresolver=le
- traefik.http.routers.dash-app-http.middlewares=https-redirect
- traefik.http.middlewares.operator-auth.basicauth.users=${OPERATOR_USERNAME?Variable not set}:${HASHED_OPERATOR_PASSWORD?Variable not set}
- traefik.http.routers.dash-app-https.middlewares=operator-auth
volumes:
- ${SWTOOLSWORKINGDIR:-/tmp}:${SWTOOLSWORKINGDIR:-/tmp}
command: ['waitress-serve', '--port=${DASH_PORT:-8080}', 'app.order_dashboard:app.server']
env_file:
- .env
environment:
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
- RABBITMQ_HOST=rabbitmq
- REDIS_HOST=${HOSTNAME?Variable not set}
networks:
- traefik-public
depends_on:
- rabbitmq
- redis
networks:
traefik-public:
external: true
和我的 celery 配置文件:
import os
import warnings
from pathlib import Path
# result backend use redis
result_backend_host = os.getenv('REDIS_HOST', 'localhost')
result_backend_pass = os.getenv('REDIS_PASS', 'password')
result_backend = 'redis://:{password}@{host}:6379/0'.format(password=result_backend_pass, host=result_backend_host)
# redis_retry_on_timeout = True
redis_socket_keepalive = True
# broker use rabbitmq
rabbitmq_user = os.getenv('RABBITMQ_DEFAULT_USER', 'guest')
rabbitmq_pass = os.getenv('RABBITMQ_DEFAULT_PASS', 'guest')
rabbitmq_host = os.getenv('RABBITMQ_HOST', 'localhost')
broker_url = 'amqp://{user}:{password}@{host}:5672//'.format(user=rabbitmq_user, password=rabbitmq_pass, host=rabbitmq_host)
include = ['app.worker.tasks', 'app.dashboard.example1', 'app.dashboard.example2']
#task events
worker_send_task_events = True
task_send_sent_event = True
所有环境变量都已定义,并且运行良好,除了我的套接字超时问题!当我在容器实例上部署新工作人员时,我设置了环境变量,以便它连接到在 docker-compose 上运行的rabbitmq 和 redis。
这是我的 celery 文件,它定义了 celery 应用程序:
from celery import Celery
from app.worker import celery_config
celery_app = Celery()
celery_app.config_from_object(celery_config)
最佳答案
我猜你的 Redis 实例和工作线程之间有一些防火墙。您可以登录到 SandboxHost...
并确保可以连接您的 Redis 吗?
您可以使用 telnet
来做到这一点,例如:
telnet <your_redis_hostname> <your_redis_port>
或者使用 redis-cli:
redis-cli -h <your_redis_hostname> -p <your_redis_port>
编辑:
似乎您缺少result_backend :
result_backend = f"redis://username:{result_backend_pass}@{result_backend_host}:6379/0"
并确保您的 REDIS_HOST=${HOSTNAME?Variable not set}
有效...
编辑2:
您能否将 bind
添加到您的 Redis 命令中:
["redis-server", "--bind", "0.0.0.0", "--requirepass", "${RABBITMQ_DEFAULT_PASS:-password}"]
请注意其安全隐患!
关于azure - 工作人员使用 celery、redis 和rabbitMQ 发回结果有 2 分钟延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69778299/
我有一个关于 Redis Pubsub 的练习,如下所示: 如果发布者发布消息但订阅者没有收到服务器崩溃。订阅者如何在重启服务器时收到该消息? 请帮帮我,谢谢! 最佳答案 在这种情况下,消息将永远消失
我们正在使用 Service Stack 的 RedisClient 的 BlockingDequeue 来保存一些数据,直到它可以被处理。调用代码看起来像 using (var client =
我有一个 Redis 服务器和多个 Redis 客户端。每个 Redis 客户端都是一个 WebSocket+HTTP 服务器,其中包括管理 WebSocket 连接。这些 WebSocket+HTT
我有多个 Redis 实例。我使用不同的端口创建了一个集群。现在我想将数据从预先存在的 redis 实例传输到集群。我知道如何将数据从一个实例传输到集群,但是当实例多于一个时,我无法做到这一点。 最佳
配置:三个redis集群分区,跨三组一主一从。当 Master 宕机时,Lettuce 会立即检测到中断并开始重试。但是,Lettuce 没有检测到关联的 slave 已经将自己提升为 master
我想根据从指定集合中检索这些键来删除 Redis 键(及其数据集),例如: HMSET id:1 password 123 category milk HMSET id:2 password 456
我正在编写一个机器人(其中包含要禁用的命令列表),用于监视 Redis。它通过执行禁用命令,例如 (rename-command ZADD "")当我重新启动我的机器人时,如果要禁用的命令列表发生变化
我的任务是为大量听众使用发布/订阅。这是来自 docs 的订阅的简化示例: r = redis.StrictRedis(...) p = r.pubsub() p.subscribe('my-firs
我一直在阅读有关使用 Redis 哨兵进行故障转移的内容。我打算有1个master+1个slave,如果master宕机超过1分钟,就把slave变成master。我知道这在 Sentinel 中是
与仅使用常规 Redis 和创建分片相比,使用 Redis 集群有哪些优势? 在我看来,Redis Cluster 更注重数据安全(让主从架构解决故障)。 最佳答案 我认为当您需要在不丢失任何数据的情
由于 Redis 以被动和主动方式使 key 过期, 有没有办法得到一个 key ,即使它的过期时间已过 (但 在 Redis 中仍然存在 )? 最佳答案 DEBUG OBJECT myKey 将返回
我想用redis lua来实现monitor命令,而不是redis-cli monitor。但我不知道怎么办。 redis.call('monitor') 不起作用。 最佳答案 您不能从 Redis
我读过 https://github.com/redisson/redisson 我发现有几个 Redis 复制设置(包括对 AWS ElastiCache 和 Azure Redis 缓存的支持)
Microsoft.AspNet.SignalR.Redis 和 StackExchange.Redis.Extensions.Core 在同一个项目中使用。前者需要StackExchange.Red
1. 认识 Redis Redis(Remote Dictionary Server)远程词典服务器,是一个基于内存的键值对型 NoSQL 数据库。 特征: 键值(key-value)型,value
1. Redis 数据结构介绍 Redis 是一个 key-value 的数据库,key 一般是 String 类型,但 value 类型多种多样,下面就举了几个例子: value 类型 示例 Str
1. 什么是缓存 缓存(Cache) 就是数据交换的缓冲区,是存贮数据的临时地方,一般读写性能较高。 缓存的作用: 降低后端负载 提高读写效率,降低响应时间 缓存的成本: 数据一致性成本 代码维护成本
我有一份记录 list 。对于我的每条记录,我都需要进行一些繁重的计算,因为我要在Redis中创建反向索引。为了达到到达记录,需要在管道中执行多个redis命令(sadd为100 s + set为1
我有一个三节点Redis和3节点哨兵,一切正常,所有主服务器和从属服务器都经过验证,并且哨兵配置文件已与所有Redis和哨兵节点一起更新,但是问题是当Redis主服务器关闭并且哨兵希望选举失败者时再次
我正在尝试计算Redis中存储的消息之间的响应时间。但是我不知道该怎么做。 首先,我必须像这样存储chat_messages的时间流 ZADD conversation:CONVERSATION_ID
我是一名优秀的程序员,十分优秀!