- xml - AJAX/Jquery XML 解析
- 具有多重继承的 XML 模式
- .net - 枚举序列化 Json 与 XML
- XML 简单类型、简单内容、复杂类型、复杂内容
func main() {
jobs := []Job{job1, job2, job3}
numOfJobs := len(jobs)
resultsChan := make(chan *Result, numOfJobs)
jobChan := make(chan *job, numOfJobs)
go consume(numOfJobs, jobChan, resultsChan)
for i := 0; i < numOfJobs; i++ {
jobChan <- jobs[i]
}
close(jobChan)
for i := 0; i < numOfJobs; i++ {
<-resultsChan
}
close(resultsChan)
}
func (b *Blockchain) consume(num int, jobChan chan *Job, resultsChan chan *Result) {
for i := 0; i < num; i++ {
go func() {
job := <-jobChan
resultsChan <- doJob(job)
}()
}
}
在上面的示例中,作业被推送到 jobChan 中,goroutines 将其从 jobChan 中拉出并并发执行作业并将结果推送到 resultsChan 中。然后我们将从 resultsChan 中提取结果。
问题一:
在我的代码中,没有序列化/线性化的结果。虽然jobs的顺序是job1, job2, job3。结果可能会显示为 job3、job1、job2,具体取决于哪个花费的时间最长。
我仍然希望同时执行这些作业,但是,我需要确保结果从 resultsChan 中以与作为作业进入时相同的顺序出现。
问题2:
我有大约 30 万个作业,这意味着代码将生成多达 30 万个 goroutine。拥有如此多的 goroutine 是否有效,或者我最好将这些作业分成 100 个左右的片段,让每个 goroutine 处理 100 个而不是 1 个。
最佳答案
这是我处理序列化的一种方式(并且还设置了有限数量的工作人员)。我设置了一些带有输入和输出字段以及同步 channel 的工作对象,然后我循环遍历它们,挑选它们完成的所有工作并给它们一个新的工作。然后我最后一次通过它们以拾取所有遗留下来的已完成工作。请注意,您可能希望工作人员数量稍微超过您的核心数量,这样即使有一个异常长的工作,您也可以让所有资源保持忙碌一段时间。代码位于 http://play.golang.org/p/PM9y4ieMxw及以下。
这是毛茸茸的(比我记得在坐下来写一个例子之前毛茸茸的!)——很想看看其他人有什么,要么只是更好的实现,要么是一种完全不同的方式来实现你的目标。
package main
import (
"fmt"
"math/rand"
"runtime"
"time"
)
type Worker struct {
in int
out int
inited bool
jobReady chan bool
done chan bool
}
func (w *Worker) work() {
time.Sleep(time.Duration(rand.Float32() * float32(time.Second)))
w.out = w.in + 1000
}
func (w *Worker) listen() {
for <-w.jobReady {
w.work()
w.done <- true
}
}
func doSerialJobs(in chan int, out chan int) {
concurrency := 23
workers := make([]Worker, concurrency)
i := 0
// feed in and get out items
for workItem := range in {
w := &workers[i%
concurrency]
if w.inited {
<-w.done
out <- w.out
} else {
w.jobReady = make(chan bool)
w.done = make(chan bool)
w.inited = true
go w.listen()
}
w.in = workItem
w.jobReady <- true
i++
}
// get out any job results left over after we ran out of input
for n := 0; n < concurrency; n++ {
w := &workers[i%concurrency]
if w.inited {
<-w.done
out <- w.out
}
close(w.jobReady)
i++
}
close(out)
}
func main() {
runtime.GOMAXPROCS(10)
in, out := make(chan int), make(chan int)
allFinished := make(chan bool)
go doSerialJobs(in, out)
go func() {
for result := range out {
fmt.Println(result)
}
allFinished <- true
}()
for i := 0; i < 100; i++ {
in <- i
}
close(in)
<-allFinished
}
请注意,此示例中只有 in
和 out
携带实际数据——所有其他 channel 仅用于同步。
关于concurrency - 戈朗 : Producer/Consumer concurrency model but with serialized results,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20978778/
我有一个生产者/消费者场景,我不希望一个生产者交付产品,也不希望多个消费者消费这些产品。然而,常见的情况是交付的产品仅由一个消费者消费,而其他消费者永远看不到该特定产品。我不想完成的是每个消费者消费一
我正在设计一个系统,其中将有 n 个生产者和 m 个消费者,其中 n 和 m 是数字,n != m。 我想这样设计系统, 任何生产者在生产时不得阻止其他生产者 任何消费者都不应在消费时阻止其他消费者
关于 REST Web 服务。 @Produces("application/json") 和 @Produces(MediaType.APPICATION_JSON) 两者的工作方式相同,但第二个需
我正在尝试使用 Kafka: import java.util.Properties; import org.apache.kafka.clients.producer.Producer; impor
当我使用 Producer.flush() 时,它可以工作,但根据 kafka confluent issue 性能较差,但按照建议,我使用 Producer.poll(0) 但不会向主题生成任何消息
我正在针对 Python 的 confluent-kafka 使用 native java 实现测试 Apache Kafka Producer,以查看哪个具有最大吞吐量。 我正在使用 docker-
我看到 @products 注释允许我传递单个字符串和字符串列表。所以我只是想知道这是如何在java中完成的,如果我需要使用允许以下行为的方法来实现它,我该怎么做?或者这个注释是特定的,所以我们不能在
我正在开发一个迁移学习应用程序,我正在其中针对我的数据流重新训练 MobileNetV2。 我正在使用 retrain.py 重新训练模型来自tensorflow-hub并且没有做任何修改。 当我从终
在 Cloud Foundry 中,我能够向非 ssl url(“kafkaURL:9092”)生成消息。但它不适用于 ssl url(“kafkaURL:9093”)。 Kafka 服务器版本 0.
我正在使用 kafka 向消费者发送消息。但是由于某种原因,当我使用 Producer.send(record, new MyProducerCallback()); 向主题发送记录时,该主题的使用者
我正在编写一个演示应用程序来创建一个 Kafka Producer。我创建了一个主题并在 Kafka 上运行了一个生产者和消费者,它似乎正在工作。我正在编写一个 spring 应用程序来创建一个生产者
我在我的项目中使用 spring boot v2.2.4 和 Apache Kafka。 下面是我的pom.xml文件: org.springframewo
我正在尝试使用 java 程序制作 Kafka 生产者。但是当我运行程序时我收到了一些警告,没有任何错误但是生产者没有发送数据并且警告如下所示。 [kafka-producer-network-thr
我正在尝试加载一个简单的文本文件而不是 Kafka 中的标准输入。下载 Kafka 后,我执行了以下步骤: 启动动物园管理员: bin/zookeeper-server-start.sh config
我有一个类,它生成一个 ElasticSearch 客户端以与 @Inject 一起使用 @Produces @ApplicationScoped public Client createClient
对于一个新项目,我们在客户端使用 jQuery 组件,其中之一是 blueImp 文件 uploader 。我们愉快地编写代码,在 Chrome 和 Firefox 中一切都运行良好……直到有人尝试在
我有一些开发要做,我尝试看看是否有可以使用的设计模式。问题很简单: 我有一个启动许多线程的主线程。主线程必须等待每个线程完成然后再做其他事情。现有的代码有点难看。我有一个 while 循环来检查线程组
我正在使用驱动对象模型工具 CodeFluentEntities以便将模型部署到数据库引擎。 我正在考虑使用 localStorage 数据库引擎(如 IndexedDB 或 Web SQL)来为没有
我无法停止 ActiveMQ Producer。 场景是:我为内存使用和临时存储设置了较低的值。
我正在尝试结合使用 CDI (weld-se 2) 和 JavaFX,并且我想使用自定义创建的注释来注释我的 Controller 类,以便使用我的工厂方法管理此类创建。我想应该如下所示,但这段代码不
我是一名优秀的程序员,十分优秀!