gpt4 book ai didi

multithreading - 合并 channel 中的项目

转载 作者:IT王子 更新时间:2023-10-29 01:36:33 24 4
gpt4 key购买 nike

我有一个接收任务并将它们放入 channel 的函数。每个任务都有 ID、一些属性和一个放置结果的 channel 。看起来像这样

task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendReponse(result)

另一个goroutine从 channel 中取出一个任务,处理它并将结果放入任务的 channel

task := <-queue
task.Result <- doExpensiveComputation(task)

这段代码工作正常。但现在我想合并 queue 中的任务。任务处理是一个非常昂贵的操作,所以我想处理一次队列中具有相同 ID 的所有任务。我看到两种方法。

第一个是不将具有相同 ID 的任务放入队列,因此当现有任务到达时,它会等待它的副本完成。这是伪代码

if newTask in queue {
existing := queue.getById(newTask.ID)
existing.waitForComplete()
sendResponse(existing.ProcessingResult)
} else {
queue.enqueue(newTask)
}

因此,我可以使用 go channel 和 map 实现随机访问 + 一些同步方式(如互斥)。我不喜欢这种方式的地方是我必须在代码中携带 map 和 channel 并保持它们的内容同步。

第二种方式是将所有任务放入队列,当结果到达时,将任务和所有具有相同ID的任务从队列中取出,然后将结果发送给所有任务。这是伪代码

someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
theSameTask.Result <- result
}

而且我知道如何使用与上述相同的方式使用 chan + map + mutex 来实现它。

问题是:是否有一些内置/现有的数据结构可用于此类问题?还有其他(更好的)方法吗?

最佳答案

如果我正确理解了这个问题,我想到的最简单的解决方案是在任务发送者(放入 queue )和工作人员(从 queue 获取)之间添加一个中间层。这可能是例行公事,负责存储当前任务(按 ID)并将结果广播到每个匹配的任务。

伪代码:

go func() {
active := make(map[TaskID][]Task)

for {
select {
case task := <-queue:
tasks := active[task.ID]
// No tasks with such ID, start heavy work
if len(tasks) == 0 {
worker <- task
}
// Save task for the result
active[task.ID] = append(active[task.ID], task)
case r := <-response:
// Broadcast to all tasks
for _, task := range active[r.ID] {
task.Result <- r.Result
}
}
}
}()

不需要互斥体,也可能不需要携带任何东西,工作人员只需将所有结果放入这个中间层,然后正确路由响应。如果冲突 ID 有可能在一段时间内到达,您甚至可以轻松地在此处添加缓存。

编辑: 我梦见上面的代码导致了死锁。如果您一次发送大量请求并阻塞 worker channel 有一个严重的问题——这个中间层例程卡在 worker <- task 上等待 worker 完成,但所有 worker 可能会在发送到响应 channel 时被阻塞(因为我们的例程无法收集它)。 Playable proof.

可以考虑在 channel 中添加一些缓冲区,但这不是一个合适的解决方案(除非您可以将系统设计成缓冲区永远不会填满的方式)。有几种方法可以解决这个问题;例如,您可以运行一个单独的例程来收集响应,但是您需要保护 active映射与互斥锁。可行的。你也可以放 worker <- task进入选择,它将尝试将任务发送给工作人员,接收新任务(如果没有发送)或收集响应。人们可以利用 nil channel 永远不会准备好进行通信(被 select 忽略)这一事实,因此您可以在单个 select 中交替接收和发送任务。示例:

go func() {
var next Task // received task which needs to be passed to a worker
in := queue // incoming channel (new tasks) -- active
var out chan Task // outgoing channel (to workers) -- inactive
for {
select {
case t := <-in:
next = t // store task, so we can pass to worker
in, out = nil, worker // deactivate incoming channel, activate outgoing
case out <- next:
in, out = queue, nil // deactivate outgoing channel, activate incoming
case r := <-response:
collect <- r
}
}
}()

play

关于multithreading - 合并 channel 中的项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30329178/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com