gpt4 book ai didi

ruby - 如何最好地保持作业队列清理重试/重复作业(使用 sidekiq 和 redis-semaphore)

转载 作者:IT王子 更新时间:2023-10-29 05:57:19 28 4
gpt4 key购买 nike

我有一个 Rails 应用程序,可以从多个 IMAP 帐户获取大量电子邮件。

  • 我使用 sidekiq 来处理这些工作。
  • 我使用 sidetiq 来安排作业。
  • 我使用 redis-semaphore 来确保同一用户的重复作业不会相互偶然发现。

虽然有 2 个问题:

  • 1:当一个作业命中“if s.lock”时,redis-semaphore 将其暂停,直到之前的所有作业都完成。我需要取消作业而不是排队。
  • 2:如果在作业期间出现异常,导致崩溃,sidekiq 会将作业放回队列中重试。我需要取消作业而不是排队。将“sidekiq_options :retry => false”放入代码中似乎没有什么不同。

我的代码:

class FetchMailsJobs
include Sidekiq::Worker
include Sidetiq::Schedulable

tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

def perform(last_occurrence, current_occurrence)
users = User.all
users.each do |user|

if user.imap_accounts.exists?
ImapJob.perform_async(user._id.to_s)
end
end
end
end

class ImapJob
include Sidekiq::Worker

def perform(user_id)
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
if s.lock
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
s.unlock
end
end
end

最佳答案

1。创建 Redis子类和重载 blpop接受-1用于非阻塞使用 lpop .

redis-semaphore 调用 @redis.blpop Redis::Semaphore#lock .虽然你可以重载 lock使用方法@redis.lpop相反,一种更简单的方法是传递 Redis 的自定义实例到信号量。

将以下内容放在 lib 中你的 Rails 应用程序并在你的 config/initializers/sidekiq.rb 中需要它(或者做任何你喜欢加载以下类的事情)。

class NonBlockingRedis < Redis
def blpop(key, timeout)
if timeout == -1
result = lpop(key)
return result if result.nil?
return [key, result]
else
super(key, timeout)
end
end
end

每当您调用 Redis::Semaphore.new ,传递一个 :redis键与 NonBlockingRedis 的新实例类。

调用s.lock-1作为参数使用 lpop而不是 blpop .

s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock -1
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
s.unlock
end

2。使用 sidekiq_options retry: false在你的 worker 类(Class)应该工作,见下面的例子。

在您的问题中,您没有指定哪个工作人员遇到作业在重试队列中结束的问题。自 FetchMailsJobs结束排队 ImapJob作业,前者中的异常可能会导致显示 ImapJob正在重新排队。

使用信号量锁,将您的工作包装在 begin rescue ensure 中也是一个好主意。 block 。

class FetchMailsJobs
include Sidekiq::Worker
include Sidetiq::Schedulable

sidekiq_options retry: false

tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

def perform(last_occurrence, current_occurrence)
users = User.all
users.each do |user|

if user.imap_accounts.exists?
ImapJob.perform_async(user._id.to_s)
end
end
end
end

class ImapJob
include Sidekiq::Worker

sidekiq_options retry: false

def perform(user_id)
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock - 1
begin
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
rescue => e
# ignore; do nothing
ensure
s.unlock
end
end
end
end

参见 sidekiq Advanced Options: workers获取更多信息。

关于ruby - 如何最好地保持作业队列清理重试/重复作业(使用 sidekiq 和 redis-semaphore),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16379843/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com