gpt4 book ai didi

redis - 使用 Redis 的可扩展延迟任务执行

转载 作者:IT王子 更新时间:2023-10-29 05:54:50 25 4
gpt4 key购买 nike

我需要设计一个Redis驱动的可伸缩任务调度系统。

要求:

  • 多个工作进程。
  • 任务很多,但长时间闲置是可能的。
  • 合理的计时精度。
  • 空闲时资源浪费最少。
  • 应该使用同步 Redis API。
  • 应该适用于 Redis 2.4(即没有即将推出的 2.6 的功能)。
  • 不应使用 Redis 以外的其他 RPC 方式。

伪 API:schedule_task(timestamp, task_data)。时间戳以整数秒为单位。

基本思路:

  • 听取列表中即将执行的任务。
  • 根据时间戳将任务放入桶中。
  • 休眠直到最接近的时间戳。
  • 如果出现新任务,其时间戳小于最接近的任务,则唤醒。
  • 分批处理时间戳≤现在的所有即将到来的任务(假设任务执行速度很快)。
  • 确保并发工作人员不会处理相同的任务。同时,确保如果我们在处理任务时崩溃,不会丢失任何任务。

到目前为止,我还不知道如何将其放入 Redis 基元中...

有什么线索吗?

请注意,有一个类似的老问题:Delayed execution / scheduling with Redis?在这个新问题中,我介绍了更多细节(最重要的是,很多 worker )。到目前为止,我无法弄清楚如何在这里应用旧答案——因此,一个新问题。

最佳答案

这是另一个基于其他几个 [1] 的解决方案。它使用 redis WATCH 命令来消除竞争条件,而无需在 redis 2.6 中使用 lua。

基本方案是:

  • 将 redis zset 用于计划任务,将 redis 队列用于准备好运行的任务。
  • 让调度员轮询 zset 并将准备好运行的任务移动到 redis 队列中。您可能需要 1 个以上的调度员来实现冗余,但您可能不需要或不需要很多。
  • 拥有任意数量的 worker,这些 worker 会阻塞 redis 队列中的弹出消息。

我还没有测试过:-)

foo job creator 会做:

def schedule_task(queue, data, delay_secs):
# This calculation for run_at isn't great- it won't deal well with daylight
# savings changes, leap seconds, and other time anomalies. Improvements
# welcome :-)
run_at = time.time() + delay_secs

# If you're using redis-py's Redis class and not StrictRedis, swap run_at &
# the dict.
redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data})

schedule_task('foo_queue', foo_data, 60)

调度员看起来像:

while working:
redis.watch(SCHEDULED_ZSET_KEY)
min_score = 0
max_score = time.time()
results = redis.zrangebyscore(
SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False)
if results is None or len(results) == 0:
redis.unwatch()
sleep(1)
else: # len(results) == 1
redis.multi()
redis.rpush(results[0]['queue'], results[0]['data'])
redis.zrem(SCHEDULED_ZSET_KEY, results[0])
redis.exec()

foo worker 看起来像:

while working:
task_data = redis.blpop('foo_queue', POP_TIMEOUT)
if task_data:
foo(task_data)

[1] 此解决方案基于 not_a_golfer 的解决方案,位于 http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/。 ,以及交易的 redis 文档。

关于redis - 使用 Redis 的可扩展延迟任务执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10868552/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com