- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我正在尝试构建一个系统,工作池/作业队列,以在每个 API 端点上处理尽可能多的 http 请求
。我调查了这个 example让它工作得很好,只是我偶然发现了我不明白如何将 pool/jobqueue
扩展到不同端点的问题。
出于场景考虑,让我们绘制一个 Golang http 服务器,它在不同的端点和请求类型 GET
和 POST
ETC 上有百万请求/分钟。
如何扩展这个概念?我应该为每个端点创建不同的工作池和作业吗?或者我可以创建不同的作业并将它们输入同一个队列并让同一个池处理这些作业吗?
我想保持简单性,如果我创建一个新的 API 端点,我就不必创建新的工作线程池,这样我就可以只专注于 API。但性能也非常重要。
我尝试构建的代码取自前面链接的示例,here是具有此代码的其他人的 github“要点”。
最佳答案
首先要说明的是:如果你正在运行一个 HTTP 服务器(无论如何都是 Go 的标准服务器),你无法在不停止并重新启动服务器的情况下控制 goroutines 的数量。每个请求至少启动一个 goroutine,对此您无能为力。好消息是这通常不是问题,因为 goroutines 非常轻量级。但是,您希望控制正在努力工作的 goroutines 的数量是完全合理的。
您可以将任何值放入 channel ,包括函数。因此,如果目标是只需要在 http 处理程序中编写代码,就让作业关闭——工作人员不知道(或不关心)他们在做什么。
package main
import (
"encoding/json"
"io/ioutil"
"net/http"
)
var largePool chan func()
var smallPool chan func()
func main() {
// Start two different sized worker pools (e.g., for different workloads).
// Cancelation and graceful shutdown omited for brevity.
largePool = make(chan func(), 100)
smallPool = make(chan func(), 10)
for i := 0; i < 100; i++ {
go func() {
for f := range largePool {
f()
}
}()
}
for i := 0; i < 10; i++ {
go func() {
for f := range smallPool {
f()
}
}()
}
http.HandleFunc("/endpoint-1", handler1)
http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay?
http.ListenAndServe(":8080", nil)
}
func handler1(w http.ResponseWriter, r *http.Request) {
// Imagine a JSON body containing a URL that we are expected to fetch.
// Light work that doesn't consume many of *our* resources and can be done
// in bulk, so we put in in the large pool.
var job struct{ URL string }
if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
go func() {
largePool <- func() {
http.Get(job.URL)
// Do something with the response
}
}()
w.WriteHeader(http.StatusAccepted)
}
func handler2(w http.ResponseWriter, r *http.Request) {
// The request body is an image that we want to do some fancy processing
// on. That's hard work; we don't want to do too many of them at once, so
// so we put those jobs in the small pool.
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
go func() {
smallPool <- func() {
processImage(b)
}
}()
w.WriteHeader(http.StatusAccepted)
}
func processImage(b []byte) {}
这是一个非常简单的例子来说明这一点。如何设置工作池并不重要。你只需要一个聪明的工作定义。在上面的示例中,它是一个闭包,但您也可以定义一个 Job 接口(interface),例如。
type Job interface {
Do()
}
var largePool chan Job
var smallPool chan Job
现在,我不会将整个工作线程池方法称为“简单”。你说你的目标是限制 goroutines(正在工作)的数量。那根本不需要 worker ;它只需要一个限制器。这是与上面相同的示例,但是使用 channel 作为信号量来限制并发。
package main
import (
"encoding/json"
"io/ioutil"
"net/http"
)
var largePool chan struct{}
var smallPool chan struct{}
func main() {
largePool = make(chan struct{}, 100)
smallPool = make(chan struct{}, 10)
http.HandleFunc("/endpoint-1", handler1)
http.HandleFunc("/endpoint-2", handler2)
http.ListenAndServe(":8080", nil)
}
func handler1(w http.ResponseWriter, r *http.Request) {
var job struct{ URL string }
if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
go func() {
// Block until there are fewer than cap(largePool) light-work
// goroutines running.
largePool <- struct{}{}
defer func() { <-largePool }() // Let everyone that we are done
http.Get(job.URL)
}()
w.WriteHeader(http.StatusAccepted)
}
func handler2(w http.ResponseWriter, r *http.Request) {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
go func() {
// Block until there are fewer than cap(smallPool) hard-work
// goroutines running.
smallPool <- struct{}{}
defer func() { <-smallPool }() // Let everyone that we are done
processImage(b)
}()
w.WriteHeader(http.StatusAccepted)
}
func processImage(b []byte) {}
关于multithreading - Golang HTTP 请求工作池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46820427/
我有2个功能: function func1() while true do -- listen on connection end end function func2()
我的问题可能看起来很奇怪,但我想我正面临着 volatile 的问题。对象。 我写了一个这样实现的库(只是一个方案,不是真正的内容): (def var1 (volatile! nil)) (def
由于 maven 支持多线程构建,是否可以同时运行 Sonar 多线程? (例如 mvn sonar:sonar -T 4 ) 我运行了它,当模块报告成功时,它报告整个构建失败并返回 java.uti
我们正在启动一个网站,该网站在短时间内的交易量非常大。它基本上是在给票。该代码是用Java,Spring和Hibernate编写的。我想通过产生多个线程并尝试使用JUnit测试用例来获取票证来模仿高容
我正在尝试访问像素数据并将图像从游戏中的相机保存到磁盘。最初,简单的方法是使用渲染目标,然后使用RenderTarget-> ReadPixels(),但是由于ReadPixels()的 native
我们有以下系统: 用户数:〜500k 项目数:〜100k UserSimilarity userSimilarity = new TanimotoCoefficientSimilarity(dataM
也许这是一个经常出现的问题,但我需要根据我的上下文进行一些自定义。 我正在使用 Spring Batch 3.0.1.RELEASE 我有一个简单的工作,有一些步骤。一个步骤是这样的 block :
也许这是一个经常出现的问题,但我需要根据我的上下文进行一些自定义。 我正在使用 Spring Batch 3.0.1.RELEASE 我有一个简单的工作,有一些步骤。一个步骤是这样的 block :
我正在尝试使用PyBrain和Python的multiprocessing软件包在Python中训练神经网络。 这是我的代码(它训练了一个简单的神经网络来学习XOR逻辑)。 import pybrai
我有一个繁重的功能,不适合在主时间轴上执行(因为要花很长时间才能完成并使程序崩溃)。 因此我在air(as3)中搜索多线程,但是我发现的所有示例都说明了如何在worker中运行单独的swf文件。如何在
我想实现线程A 和线程B 并行运行并共享全局变量。 下面是用python编写的代码。我想在中执行相同操作Dart (我不想使用future等待,因为它正在等待其他线程完成或必须等待。) 大小写变量:
我的一个项目只适用于调试 DLL,而不适用于非调试 DLL。 在 Debug DLL 设置下发布项目有哪些注意事项?例如,是否丢失了某些优化? 如何通过将调试版本设置为非调试 DLL 来调试此项目?我
我正在尝试比较 Matlab 和 Julia 之间的速度和性能。我正在查看一个代码,该代码对承受给定负载的连续体结构进行拓扑优化。我正在查看的代码是公共(public)代码topopt88.m:htt
Serving Flask 应用程序“服务器”(延迟加载) 环境:生产警告:这是一个开发服务器。不要在生产部署中使用它。请改用生产 WSGI 服务器。 Debug模式:开启 在 http://0.0.
我对 PyQT 很陌生。我正在学习如何制作 Progressbar 并随着算法的进展对其进行更新。我已经能够制作一个使用此链接进行 self 更新的基本进度条:Python pyqt pulsing
我正在尝试指定在特定线程上运行任务,这样我就可以使用两个专用于“放入” channel 的耗时任务的线程,而其他线程则用于处理该任务。 我对如何将特定任务分配给特定线程感到困惑。我以为我可以使用类似
我正在编写一个软件,它对很多(潜在的大)图像进行大量图像操作/合成。 多线程有助于提高速度,但 QT 不允许同时在同一图像上使用多个 QPainter。 所以我必须在副本的每个线程中进行图像操作/合成
此脚本读取 url 文件以执行多线程 HTTP 请求。 如何使用带有 url 的数组来发出多线程请求? 我的阵列将有类似的东西: @array = ("https://example.com/xsd"
Java 文档声明了以下关于构造函数同步的内容: Note that constructors cannot be synchronized — using the synchronized keyw
我有一个程序,其中主线程创建了很多线程。它崩溃了,我正在调试核心文件。崩溃发生在其中一个子线程中。为了找到原因,我需要知道主线程是否还活着。有什么方法可以找出哪个线程是初始线程? 最佳答案 Is th
我是一名优秀的程序员,十分优秀!