- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我有一个接收任务并将它们放入 channel 的函数。每个任务都有 ID、一些属性和一个放置结果的 channel 。看起来像这样
task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendReponse(result)
另一个goroutine从 channel 中取出一个任务,处理它并将结果放入任务的 channel
task := <-queue
task.Result <- doExpensiveComputation(task)
这段代码工作正常。但现在我想合并 queue
中的任务。任务处理是一个非常昂贵的操作,所以我想处理一次队列中具有相同 ID 的所有任务。我看到两种方法。
第一个是不将具有相同 ID 的任务放入队列,因此当现有任务到达时,它会等待它的副本完成。这是伪代码
if newTask in queue {
existing := queue.getById(newTask.ID)
existing.waitForComplete()
sendResponse(existing.ProcessingResult)
} else {
queue.enqueue(newTask)
}
因此,我可以使用 go channel 和 map 实现随机访问 + 一些同步方式(如互斥)。我不喜欢这种方式的地方是我必须在代码中携带 map 和 channel 并保持它们的内容同步。
第二种方式是将所有任务放入队列,当结果到达时,将任务和所有具有相同ID的任务从队列中取出,然后将结果发送给所有任务。这是伪代码
someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
theSameTask.Result <- result
}
而且我知道如何使用与上述相同的方式使用 chan + map + mutex 来实现它。
问题是:是否有一些内置/现有的数据结构可用于此类问题?还有其他(更好的)方法吗?
最佳答案
如果我正确理解了这个问题,我想到的最简单的解决方案是在任务发送者(放入 queue
)和工作人员(从 queue
获取)之间添加一个中间层。这可能是例行公事,负责存储当前任务(按 ID)并将结果广播到每个匹配的任务。
伪代码:
go func() {
active := make(map[TaskID][]Task)
for {
select {
case task := <-queue:
tasks := active[task.ID]
// No tasks with such ID, start heavy work
if len(tasks) == 0 {
worker <- task
}
// Save task for the result
active[task.ID] = append(active[task.ID], task)
case r := <-response:
// Broadcast to all tasks
for _, task := range active[r.ID] {
task.Result <- r.Result
}
}
}
}()
不需要互斥体,也可能不需要携带任何东西,工作人员只需将所有结果放入这个中间层,然后正确路由响应。如果冲突 ID 有可能在一段时间内到达,您甚至可以轻松地在此处添加缓存。
编辑: 我梦见上面的代码导致了死锁。如果您一次发送大量请求并阻塞 worker
channel 有一个严重的问题——这个中间层例程卡在 worker <- task
上等待 worker 完成,但所有 worker 可能会在发送到响应 channel 时被阻塞(因为我们的例程无法收集它)。 Playable proof.
可以考虑在 channel 中添加一些缓冲区,但这不是一个合适的解决方案(除非您可以将系统设计成缓冲区永远不会填满的方式)。有几种方法可以解决这个问题;例如,您可以运行一个单独的例程来收集响应,但是您需要保护 active
映射与互斥锁。可行的。你也可以放 worker <- task
进入选择,它将尝试将任务发送给工作人员,接收新任务(如果没有发送)或收集响应。人们可以利用 nil channel 永远不会准备好进行通信(被 select 忽略)这一事实,因此您可以在单个 select 中交替接收和发送任务。示例:
go func() {
var next Task // received task which needs to be passed to a worker
in := queue // incoming channel (new tasks) -- active
var out chan Task // outgoing channel (to workers) -- inactive
for {
select {
case t := <-in:
next = t // store task, so we can pass to worker
in, out = nil, worker // deactivate incoming channel, activate outgoing
case out <- next:
in, out = queue, nil // deactivate outgoing channel, activate incoming
case r := <-response:
collect <- r
}
}
}()
关于multithreading - 合并 channel 中的项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30329178/
我很好奇为什么以下不起作用。一般select用default:防止死锁,但在这种情况下不是: package main import "fmt" func main () { a := mak
我一生都无法弄清楚如何切换图像排序。图像以 (x,x,3) 格式读取,theano 要求它是 (3,x,x) 格式。我尝试更改顺序numpy.array([img[:,:,i] for i in ra
我正在向 libnl 发送单个 SSID 和频率进行扫描,但我得到了多个扫描结果以及我请求的 SSID 和频率,但我需要单个扫描结果(仅适用于请求的 SSID),如何实现这一点。请帮助我,我也发送了我
我是 Golang 的新手,但正在努力理解这门伟大的语言!请帮帮我.. 我有 2 个 channel 。 “进”和“出” channel in, out := make(chan Work),
例如我有这段代码: package main import ( "fmt" ) func main() { c1 := make(chan interface{}) close
我们使用以下调用来获取经过身份验证的用户的 ChannelID,它适用于大多数情况。一些 YouTube 用户将他们的 channel 连接到 Google+ 信息页,但在这种情况下,我们的一位用户无
case 'sinfo': const sinfo = new Discord.MessageEmbed() .addField('Server Name 🔎 :', message.guild.n
我需要让所有 channel 来创建一个 bunker 命令,这使得所有 channel 都是只读的。 最佳答案 他们变了Client.servers至 Client.guilds在 newer ve
为什么当第二个值通过另一个 go routine 发送并且没有收到发送的第一个值时, channel c 没有缓冲? package main import "fmt" func sum(s []in
据我所知,内置的 split 会将一个 3 channel Mat 拆分为三个 1 channel Mat。结果,这三个 Mat 只是具有一些不同强度的灰度。 我的意图是获得三个 3 channel
如何检测当前的 RAM 配置?我需要询问 Windows RAM 当前是在单 channel 、双 channel 还是四 channel 中运行。 我搜索了很多,并没有在这个网站或其他网站上找到任何
我需要拆分一个多 channel wav 文件并将每个 channel 编码为 mp3 文件。 我知道 gtresamer 的 deinterleave 插件,但我不确定如何将它用于 wav 文件以及
关闭。这个问题需要details or clarity .它目前不接受答案。 想要改进这个问题吗? 通过 editing this post 添加详细信息并澄清问题. 关闭 8 年前。 Improve
我正在尝试运行 Hyperledger Fabric 网络,它由单个订购者、单个对等节点和一个 cli 组成。为了学习启动 Hyperledger Fabric 网络的过程,从创建与加密相关的工件到将
我在 Laravel 中使用事件广播。我正在使用基于角色的通知访问权限。我有用于广播的自定义 auth guard。当用户连接到 channel 时,客户端将具有内部权限的 access_token
我正在编写一个使用 Elixir Channels 来处理实时事件的应用程序。我知道每个客户端将打开 1 个套接字,并且可以在其上多路复用多个 channel 。所以我的应用程序是一个聊天应用程序,其
我有一些 .wav 文件,我想转换它们的频率 (fs) 和 channel 数 (nchannels)。我在jupyter笔记本python3.6上使用ffmpeg。我使用了以下命令并且它有效。 cm
我有一个视频渲染器,它需要两个 H265 流(YUV420),我需要烘焙它们以使它们中的一个与另一个形成 alpha 蒙版。这一切都已解决并且效果很好,但是如果我按照此处的说明进行操作: ffmpeg
我运行此命令以便能够将 udp 直播流传输到可使用正在构建的移动应用程序播放的 http 直播流。 它只是一个只有音频流的流。 ffmpeg -i udp://@localhost:1111 -map
我在我的 discord.js 机器人中创建了 nuke 命令,它创建了具有相同名称、权限、主题等的 channel ,并删除了“原始” channel 。但是有一个问题,如何使 channel 与“
我是一名优秀的程序员,十分优秀!