- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
当前设置:celery 在 EC2 节点上的 docker 容器(使用我们的产品代码)上运行,创建和处理任务。我们的后端/代理是 Redis,在 AWS 的 elasticache 中运行。
目标:能够在任何给定时间查看队列大小(类似于 flower 的监控),希望通过 AWS CloudWatch,但不是必需的。任务内容不相关,因为我熟悉备份 redis 实例,并且可以使用本地工具解析备份以进行任何需要的分析。短期历史数据是首选(CloudWatch 可以追溯到 2 周,并且具有 1 分钟数据点的粒度,这非常好)。
根据我对 Flower 工作方式的了解,由于我们目前存在的安全组/限制数量,Flower 不可行。此外,flower 仅在您访问该页面时进行监控,因此不会保存任何历史数据。
Elasticache 已经在 CloudWatch 中内置了 redis 中的项目数量。在我看来,这是实现目标的最佳途径。然而,目前队列代表 redis 中的一个项目(无论队列中有多少任务)。以下是解析为 json 的 redis 备份示例:
[{
"1.api_data__cached_api_route.000":"{\"i1\": 0, \"i2\": 1, \"i3\": 0}",
"1.api_data__cached_api_route.001":"{\"i1\": 0, \"i2\": 0, \"i3\": 0}",
"1.api_data__cached_api_route.002":"{\"i1\": 1, \"i2\": 1, \"i3\": 0}",
"staging_event_default":["{\"id\":\"b28b056c-1268-4577-af8a-9f1948860502\", \"task\":{...}}, "{\"id\":\"52668c46-3972-457a-be3a-6e27eedd26e3\", \"task\":{...}}]
}]
Cloudwatch 将其视为 4 个项目、3 个缓存的 api 路由和 1 个队列。即使队列有数千个项目,它仍会显示为 4 个项目。 #(队列中的项目)和#(队列中的项目和其他缓存的项目)之间的差异很好,因为这个监控工具将主要用于查看队列是否得到了可怕的备份,并且队列的大小会相形见绌缓存的 api 路由数。
要继续沿着这条路线前进,最简单的答案是如果 celery 有一个配置选项来使队列中的每个项目成为它自己的 redis 项目。如果对此有一个简单的修复或配置选项,它似乎是最容易实现的。这是我们当前的 celery 配置选项:
flask_app.config.update(
CELERY_BROKER_URL=redis_host,
CELERY_RESULT_BACKEND=redis_host,
CELERY_QUEUES=queue_manager.queues,
CELERY_ROUTES=queue_manager.routes,
CELERY_DEFAULT_QUEUE=queue_manager.default_queue_name,
CELERY_DEFAULT_EXCHANGE=queue_manager.default_exchange_name)
_celery = celery.Celery(flask_app.import_name,
backend=flask_app.config['CELERY_RESULT_BACKEND'],
broker=flask_app.config['CELERY_BROKER_URL'])
opts = {
'broker_url': redis_host,
'result_backed': redis_host,
'task_queues': queue_manager.queues,
'task_routes': queue_manager.routes,
'task_default_queue': queue_manager.default_queue_name,
'task_default_exchange': queue_manager.default_exchange_name,
'worker_send_task_events': True,
'task_ignore_result': True,
'task_store_errors_even_if_ignored': True,
'result_expires': 3600,
'worker_redirect_stdouts': False,
'beat_scheduler': redbeat.RedBeatScheduler,
'beat_max_loop_interval': 5
}
_celery.conf.update(opts)
我遇到的另一个选项是 celery-cloudwatch-logs ,这似乎与我想要实现的目标一致,但似乎更旨在查看每项任务的特定内容,而不是汇总(但我可能在那里错了)。
如果没有满足目标的完美/简单的解决方案,我将研究 fork /构建 celery-cloudwatch 以仅上传相关信息。我们的团队继承了目前存在的大部分代码,我对 celery 的工作原理有基本的了解,但并不深入。
提前感谢任何人的想法、评论和帮助!
最佳答案
如果有人碰巧遇到它,我会在这里发布我所做的事情。
我们已经在应用程序的其他地方安装并配置了 boto3 以进行 S3 访问,这使得发布到 CloudWatch 变得非常容易。
我在 Celery
类中添加了一个方法,使用 redis 模块中的 llen
检查队列的长度:
@staticmethod
def check_lengths():
result = {}
for q in Celery._queues:
result[q] = Celery._redis.llen(q)
return result
然后发布到 Cloudwatch 也相当容易:
namespace = "Celery/Queue"
metrics = []
for qname, qlen in data.items():
metric = {}
metric["MetricName"] = "ItemsInQueue"
metric["Dimensions"] = [ {"Name": "QueueName", "Value": qname} ]
metric["Value"] = qlen
metric["Unit"] = "Count"
metrics.append(metric)
self.cw_client.put_metric_data(Namespace=namespace, MetricData=metrics)
然后我最终使用 AWS Lambda 将网络请求发送到端点,然后将上述数据发布到 CloudWatch。
关于amazon-web-services - Celery Redis 后端-使队列中的任务作为项目存在,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54525396/
我有一个关于 Redis Pubsub 的练习,如下所示: 如果发布者发布消息但订阅者没有收到服务器崩溃。订阅者如何在重启服务器时收到该消息? 请帮帮我,谢谢! 最佳答案 在这种情况下,消息将永远消失
我们正在使用 Service Stack 的 RedisClient 的 BlockingDequeue 来保存一些数据,直到它可以被处理。调用代码看起来像 using (var client =
我有一个 Redis 服务器和多个 Redis 客户端。每个 Redis 客户端都是一个 WebSocket+HTTP 服务器,其中包括管理 WebSocket 连接。这些 WebSocket+HTT
我有多个 Redis 实例。我使用不同的端口创建了一个集群。现在我想将数据从预先存在的 redis 实例传输到集群。我知道如何将数据从一个实例传输到集群,但是当实例多于一个时,我无法做到这一点。 最佳
配置:三个redis集群分区,跨三组一主一从。当 Master 宕机时,Lettuce 会立即检测到中断并开始重试。但是,Lettuce 没有检测到关联的 slave 已经将自己提升为 master
我想根据从指定集合中检索这些键来删除 Redis 键(及其数据集),例如: HMSET id:1 password 123 category milk HMSET id:2 password 456
我正在编写一个机器人(其中包含要禁用的命令列表),用于监视 Redis。它通过执行禁用命令,例如 (rename-command ZADD "")当我重新启动我的机器人时,如果要禁用的命令列表发生变化
我的任务是为大量听众使用发布/订阅。这是来自 docs 的订阅的简化示例: r = redis.StrictRedis(...) p = r.pubsub() p.subscribe('my-firs
我一直在阅读有关使用 Redis 哨兵进行故障转移的内容。我打算有1个master+1个slave,如果master宕机超过1分钟,就把slave变成master。我知道这在 Sentinel 中是
与仅使用常规 Redis 和创建分片相比,使用 Redis 集群有哪些优势? 在我看来,Redis Cluster 更注重数据安全(让主从架构解决故障)。 最佳答案 我认为当您需要在不丢失任何数据的情
由于 Redis 以被动和主动方式使 key 过期, 有没有办法得到一个 key ,即使它的过期时间已过 (但 在 Redis 中仍然存在 )? 最佳答案 DEBUG OBJECT myKey 将返回
我想用redis lua来实现monitor命令,而不是redis-cli monitor。但我不知道怎么办。 redis.call('monitor') 不起作用。 最佳答案 您不能从 Redis
我读过 https://github.com/redisson/redisson 我发现有几个 Redis 复制设置(包括对 AWS ElastiCache 和 Azure Redis 缓存的支持)
Microsoft.AspNet.SignalR.Redis 和 StackExchange.Redis.Extensions.Core 在同一个项目中使用。前者需要StackExchange.Red
1. 认识 Redis Redis(Remote Dictionary Server)远程词典服务器,是一个基于内存的键值对型 NoSQL 数据库。 特征: 键值(key-value)型,value
1. Redis 数据结构介绍 Redis 是一个 key-value 的数据库,key 一般是 String 类型,但 value 类型多种多样,下面就举了几个例子: value 类型 示例 Str
1. 什么是缓存 缓存(Cache) 就是数据交换的缓冲区,是存贮数据的临时地方,一般读写性能较高。 缓存的作用: 降低后端负载 提高读写效率,降低响应时间 缓存的成本: 数据一致性成本 代码维护成本
我有一份记录 list 。对于我的每条记录,我都需要进行一些繁重的计算,因为我要在Redis中创建反向索引。为了达到到达记录,需要在管道中执行多个redis命令(sadd为100 s + set为1
我有一个三节点Redis和3节点哨兵,一切正常,所有主服务器和从属服务器都经过验证,并且哨兵配置文件已与所有Redis和哨兵节点一起更新,但是问题是当Redis主服务器关闭并且哨兵希望选举失败者时再次
我正在尝试计算Redis中存储的消息之间的响应时间。但是我不知道该怎么做。 首先,我必须像这样存储chat_messages的时间流 ZADD conversation:CONVERSATION_ID
我是一名优秀的程序员,十分优秀!