- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用工作池构建通用管道库。我为源、管道和接收器创建了一个接口(interface)。您会看到,管道的工作是从输入 channel 接收数据,对其进行处理,然后将结果输出到 channel 上。这是它的预期行为:
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
return
}
func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
for j := range jobs {
func(j Job) {
defer wg.Done()
wg.Add(1)
res := doSomethingWith(j)
out <- res
}(j)
}
}
send on closed channel
的 panic 。信息。使用
-race
构建源代码flag 在
close(out)
之间发出数据竞争警告和
out <- res
.
wg
的计数器达到零。因此,
wg.Wait()
完成并且程序继续到
close(out)
.同时,job channel 还没有完成数据生成,这意味着一些工作人员仍在另一个 goroutine 中运行。由于
out
channel 已经关闭,它会导致 panic 。
最佳答案
目前尚不清楚为什么每个工作需要一名 worker ,但如果你这样做了,你可以重组你的外循环设置(见下面未经测试的代码)。这种方式首先消除了对工作池的需求。
但是,请始终执行 wg.Add
在解雇任何 worker 之前。在这里,你正在剥离 100 个 worker :
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
wg.Add(100) // ADDED - count the 100 workers
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
wg
本身进入剥离 worker 的 goroutine。如果你放弃让每个工作人员将工作作为新的 goroutine 分拆的想法,这可以让事情变得更干净。但是,如果每个 worker 都将衍生出另一个 goroutine,那么该 worker 本身也必须使用
wg.Add
, 像这样:
for j := range jobs {
wg.Add(1) // ADDED - count the spun-off goroutines
func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done() // MOVED (for illustration only, can defer as before)
}(j)
}
wg.Done() // ADDED - our work in `p.work` is now done
wg.Add(1)
)。当你读完输入 channel
jobs
, 调用
wg.Done()
(也许是通过较早的
defer
,但我在这里最后展示了它)。
wg
计算此时可以写入 channel 的事件 goroutine 的数量。只有当没有 goroutine 打算再写时,它才会变为零。 这样可以安全地关闭 channel 。
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
out = make(chan interface{})
var wg sync.WaitGroup
go func() {
defer close(out)
for j := range in {
wg.Add(1)
go func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done()
}(j)
}
wg.Wait()
}()
return out
}
in
尽可能快地进行 channel ,在运行时分拆工作。每个传入的工作都会得到一个 goroutine,除非他们提前完成工作。没有池,每个作业只有一个 worker (与您的代码相同,只是我们淘汰了没有做任何有用的池)。
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
out = make(chan interface{})
go func() {
defer close(out)
var wg sync.WaitGroup
ncpu := runtime.NumCPU() // or something fancier if you like
wg.Add(ncpu)
for i := 0; i < ncpu; i++ {
go func() {
defer wg.Done()
for j := range in {
out <- doSomethingWith(j)
}
}()
}
wg.Wait()
}
return out
}
runtime.NumCPU()
读取作业的 worker 数量与运行作业的 CPU 数量一样多。那些是游泳池,他们一次只做一项工作。
关于go - 关闭和发送到 channel 之间的竞争条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59527245/
假设我正在使用 APC,其中过程和调用代码都使用 SetLastError 和 GetLastError。这会导致 GetLastError 产生不可预测的值。有什么办法可以解决这个问题吗? VOID
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 7年前关闭。 Improve t
任何人都可以,请告诉我,如何在不进行JavaScript轮询/ setInterval的情况下,在完整日历上填充/显示在服务器端动态更新的数据。 grails中提供了Atmosphere插件,但是文档
我正在尝试调整我的代码,从仅在前台使用 WCSessionDelegate 回调到在后台通过 handleBackgroundTasks: 接受 WKWatchConnectivityRefreshB
我正在构建批处理系统。 单位 的批处理数量从 20 到 1000 不等。每个 Unit 本质上都是模型的层次结构(一个主模型和许多子模型)。我的任务涉及将每个模型层次结构作为单个事务保存到数据库中(每
我拍了一张图片并将其切成三 block ,然后将它们向右浮动,让文字围绕它们流动。 HTML 看起来像这样: 在我添加侧边栏并将其 float 到图像的右上方之前,它工作正常,就像这样... T
我正在考虑嵌入式 Linux 项目(还没有硬件)中即将出现的情况,其中两个外部芯片需要共享一条物理 IRQ 线。这条线在硬件中能够实现边沿触发,但不能实现电平触发中断。 查看 Linux 中的共享 i
我观察到,当 linux futexes 发生争用时,系统会在自旋锁上花费大量时间。我注意到即使不直接使用 futex 也是一个问题,但在调用 malloc/free、rand、glib 互斥调用和其
我终于能够获得一些工具提示,最终可以使用以下代码: Hover over me 然后 $('[rel=tooltip]').tooltip(); 我遇到的问题是它使用 jQueryUI 工
我是一名优秀的程序员,十分优秀!