gpt4 book ai didi

ruby - Sidekiq 的大任务或多个小任务

转载 作者:太空宇宙 更新时间:2023-11-03 17:47:39 25 4
gpt4 key购买 nike

我正在写一个工作人员将很多用户添加到一个组中。我想知道是运行一个拥有所有用户的大任务更好,还是像 100 个用户那样批量运行,或者每个任务一个一个地运行。

目前这是我的代码

class AddUsersToGroupWorker
include Sidekiq::Worker
sidekiq_options :queue => :group_utility

def perform(store_id, group_id, user_ids_to_add)
begin
store = Store.find store_id
group = Group.find group_id
rescue ActiveRecord::RecordNotFound => e
Airbrake.notify e
return
end

users_to_process = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
group.users += users_to_process

users_to_process.map(&:id).each do |user_to_process_id|
UpdateLastUpdatesForUserWorker.perform_async store.id, user_to_process_id
end
end
end

也许在我的方法中加入这样的东西会更好:

def add_users
users_to_process = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)

users_to_process.map(&:id).each do |user_to_process_id|
AddUserToGroupWorker.perform_async group_id, user_to_process_id
UpdateLastUpdatesForUserWorker.perform_async store.id, user_to_process_id
end
end

但是有那么多find请求。你怎么看?

如果需要(例如批处理),我有一个 sidekig pro 许可证。

最佳答案

这是我的想法。

<强>1。执行单个 SQL 查询而不是 N 个查询

这一行:group.users += users_to_process 可能会产生 N 个 SQL 查询(其中 N 是 users_to_process.count)。我假设您在用户和组之间有多对多连接(使用 user_groups 连接表/模型),因此您应该使用一些 Mass inserting data technique :

users_to_process_ids = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
.pluck(:id)
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
VALUES #{sql_values.join(",")}
")

是的,它是原始 SQL。而且它很快

<强>2。用户 pluck(:id) 而不是 map(&:id)

pluck 更快,因为:

  • 它将只选择“id”列,因此从数据库传输的数据较少
  • 更重要的是,它不会为每个原始对象创建 ActiveRecord 对象

做 SQL 很便宜。创建 Ruby 对象非常昂贵。

<强>3。使用水平并行化而不是垂直并行化

我在这里的意思是,如果您需要为十几个记录执行顺序任务 A -> B -> C,有两种主要的拆分工作的方法:

  • 垂直分割AWorker 执行A(1)A(2)A(3)BWorkerB(1)等; CWorker 完成所有 C(i) 工作;
  • 水平分割UniversalWorker 执行 A(1)+B(1)+C(1)

使用后一种(水平)方式。

这是经验的陈述,而不是从某些理论的角度(两种方式都可行)。

为什么要这样做?

  • 当您使用垂直分割时,当您将工作从一名员工传递给另一名员工时,您很可能会出错。喜欢such kind of errors .如果你遇到这样的错误,你会 panic ,因为它们不是持久的并且很容易重现。有时会发生,有时不会。是否可以编写一个代码,将工作无误地传递到链中?就是这样。但最好还是keep it simple .
  • 假设您的服务器处于静止状态。然后突然有新的工作机会到来。您的 BC worker 只会浪费 RAM,而您的 A worker 会完成这项工作。然后你的 AC 将浪费 RAM,而 B 正在工作。等等。如果您进行水平分割,您的资源消耗将自行消失。

将该建议应用于您的具体情况:对于初学者,不要在另一个异步任务中调用 perform_async

<强>4。批量处理

回答您最初的问题 – 是的,分批处理。创建和管理异步任务本身会占用一些资源,因此无需创建太多。


TL;DR 所以最后,您的代码可能如下所示:

# model code

BATCH_SIZE = 100

def add_users
users_to_process_ids = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
.pluck(:id)
# With 100,000 users performance of this query should be acceptable
# to make it in a synchronous fasion
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
VALUES #{sql_values.join(",")}
")

users_to_process_ids.each_slice(BATCH_SIZE) do |batch|
AddUserToGroupWorker.perform_async group_id, batch
end
end

# add_user_to_group_worker.rb

def perform(group_id, user_ids_to_add)
group = Group.find group_id

# Do some heavy load with a batch as a whole
# ...
# ...
# If nothing here is left, call UpdateLastUpdatesForUserWorker from the model instead

user_ids_to_add.each do |id|
# do it synchronously – we already parallelized the job
# by splitting it in slices in the model above
UpdateLastUpdatesForUserWorker.new.perform store.id, user_to_process_id
end
end

关于ruby - Sidekiq 的大任务或多个小任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32198647/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com