- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Golang你一定要懂的连接池实现由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
问题引入 。
作为一名Golang开发者,线上环境遇到过好几次连接数暴增问题(mysql/redis/kafka等).
纠其原因,Golang作为常驻进程,请求第三方服务或者资源完毕后,需要手动关闭连接,否则连接会一直存在。而很多时候,开发者不一定记得关闭这个连接.
这样是不是很麻烦?于是有了连接池。顾名思义,连接池就是管理连接的;我们从连接池获取连接,请求完毕后再将连接还给连接池;连接池帮我们做了连接的建立、复用以及回收工作.
在设计与实现连接池时,我们通常需要考虑以下几个问题:
Golang连接池实现原理 。
我们以Golang HTTP连接池为例,分析连接池的实现原理.
结构体Transport 。
Transport结构定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
type Transport struct {
//操作空闲连接需要获取锁
idleMu sync.Mutex
//空闲连接池,key为协议目标地址等组合
idleConn map[connectMethodKey][]*persistConn // most recently used at end
//等待空闲连接的队列,基于切片实现,队列大小无限制
idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns
//排队等待建立连接需要获取锁
connsPerHostMu sync.Mutex
//每个host建立的连接数
connsPerHost map[connectMethodKey]int
//等待建立连接的队列,同样基于切片实现,队列大小无限制
connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns
//最大空闲连接数
MaxIdleConns int
//每个目标host最大空闲连接数;默认为2(注意默认值)
MaxIdleConnsPerHost int
//每个host可建立的最大连接数
MaxConnsPerHost int
//连接多少时间没有使用则被关闭
IdleConnTimeout time.Duration
//禁用长连接,使用短连接
DisableKeepAlives bool
}
|
可以看到,连接护着队列,都是一个map结构,而key为协议目标地址等组合,即同一种协议与同一个目标host可建立的连接或者空闲连接是有限制的.
需要特别注意的是,MaxIdleConnsPerHost默认等于2,即与目标主机最多只维护两个空闲连接。这会导致什么呢?
如果遇到突发流量,瞬间建立大量连接,但是回收连接时,由于最大空闲连接数的限制,该联机不能进入空闲连接池,只能直接关闭。结果是,一直新建大量连接,又关闭大量连,业务机器的TIME_WAIT连接数随之突增.
线上有些业务架构是这样的:客户端 ===> LVS ===> Nginx ===> 服务。LVS负载均衡方案采用DR模式,LVS与Nginx配置统一VIP。此时在客户端看来,只有一个IP地址,只有一个Host。上述问题更为明显.
最后,Transport也提供了配置DisableKeepAlives,禁用长连接,使用短连接访问第三方资源或者服务.
连接获取与回收 。
Transport结构提供下面两个方法实现连接的获取与回收操作.
1
2
3
|
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {}
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {}
|
连接的获取主要分为两步走:1)尝试获取空闲连接;2)尝试新建连接:
1
2
3
4
5
6
7
|
//getConn方法内部实现
if delivered := t.queueForIdleConn(w); delivered {
return pc, nil
}
t.queueForDial(w)
|
当然,可能获取不到连接而需要排队,此时怎么办呢?当前会阻塞当前协程了,直到获取连接为止,或者httpclient超时取消请求:
1
2
3
4
5
6
7
8
9
10
11
|
select {
case <-w.ready:
return w.pc, w.err
//超时被取消
case <-req.Cancel:
return nil, errRequestCanceledConn
……
}
var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection") // TODO: unify?
|
排队等待空闲连接的逻辑如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
//如果配置了空闲超时时间,获取到连接需要检测,超时则关闭连接
if t.IdleConnTimeout > 0 {
oldTime = time.Now().Add(-t.IdleConnTimeout)
}
if list, ok := t.idleConn[w.key]; ok {
for len(list) > 0 && !stop {
pconn := list[len(list)-1]
tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
//超时了,关闭连接
if tooOld {
go pconn.closeConnIfStillIdle()
}
//分发连接到wantConn
delivered = w.tryDeliver(pconn, nil)
}
}
//排队等待空闲连接
q := t.idleConnWait[w.key]
q.pushBack(w)
t.idleConnWait[w.key] = q
}
|
排队等待新建连接的逻辑如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (t *Transport) queueForDial(w *wantConn) {
//如果没有限制最大连接数,直接建立连接
if t.MaxConnsPerHost <= 0 {
go t.dialConnFor(w)
return
}
//如果没超过连接数限制,直接建立连接
if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
go t.dialConnFor(w)
return
}
//排队等待连接建立
q := t.connsPerHostWait[w.key]
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}
|
连接建立完成后,同样会调用tryDeliver分发连接到wantConn,同时关闭通道w.ready,这样主协程纠接触阻塞了.
1
2
3
4
|
func (w *wantConn) tryDeliver(pc *persistConn, err error) bool {
w.pc = pc
close(w.ready)
}
|
请求处理完成后,通过tryPutIdleConn将连接放回连接池;这时候如果存在等待空闲连接的协程,则需要分发复用该连接。另外,在回收连接时,还需要校验空闲连接数目是否超过限制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
//禁用长连接;或者最大空闲连接数不合法
if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
return errKeepAlivesDisabled
}
if q, ok := t.idleConnWait[key]; ok {
//如果等待队列不为空,分发连接
for q.len() > 0 {
w := q.popFront()
if w.tryDeliver(pconn, nil) {
done = true
break
}
}
}
//空闲连接数目超过限制,默认为DefaultMaxIdleConnsPerHost=2
idles := t.idleConn[key]
if len(idles) >= t.maxIdleConnsPerHost() {
return errTooManyIdleHost
}
}
|
空闲连接超时关闭 。
Golang HTTP连接池如何实现空闲连接的超时关闭逻辑呢?从上述queueForIdleConn逻辑可以看到,每次在获取到空闲连接时,都会检测是否已经超时,超时则关闭连接.
那如果没有业务请求到达,一直不需要获取连接,空闲连接就不会超时关闭吗?其实在将空闲连接添加到连接池时,Golang同时还设置了定时器,定时器到期后,自然会关闭该连接.
1
|
pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
|
排队队列怎么实现 。
怎么实现队列模型呢?很简单,可以基于切片:
1
2
3
4
5
6
7
8
9
|
queue []*wantConn
//入队
queue = append(queue, w)
//出队
v := queue[0]
queue[0] = nil
queue = queue[1:]
|
这样有什么问题吗?随着频繁的入队与出队操作,切片queue的底层数组,会有大量空间无法复用而造成浪费。除非该切片执行了扩容操作.
Golang在实现队列时,使用了两个切片head和tail;head切片用于出队操作,tail切片用于入队操作;出队时,如果head切片为空,则交换head与tail。通过这种方式,Golang实现了底层数组空间的复用.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (q *wantConnQueue) pushBack(w *wantConn) {
q.tail = append(q.tail, w)
}
func (q *wantConnQueue) popFront() *wantConn {
if q.headPos >= len(q.head) {
if len(q.tail) == 0 {
return nil
}
// Pick up tail as new head, clear tail.
q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
}
w := q.head[q.headPos]
q.head[q.headPos] = nil
q.headPos++
return w
}
|
到此这篇关于Golang你一定要懂的连接池实现的文章就介绍到这了,更多相关Golang 连接池内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://segmentfault.com/a/1190000023676010 。
最后此篇关于Golang你一定要懂的连接池实现的文章就讲到这里了,如果你想了解更多关于Golang你一定要懂的连接池实现的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
最近,我们将专用 SQL 池部署到生产中的 Synapse 工作区。在开发中,我们可以访问无服务器 SQL 池和专用 SQL 池。但是,在生产中,我们可以访问无服务器 SQL 池,但无法访问专用 SQ
假设您从一个项目公开 WCF 服务,并使用“添加服务引用”(在本例中为 Framework 3.5 WPF 应用程序)在另一个项目中使用它。 当您重新实例化 ClientBase 派生代理时,Clie
我有一个函数,它使用 multiprocessing.Pool 并行处理一个数据集中的所有数据。 from multiprocessing import Pool ... def func():
我正在尝试使用进程对象在 python 中使用工作池。每个 worker (一个进程)进行一些初始化(花费大量时间),传递一系列作业(理想情况下使用 map()),并返回一些东西。除此之外,不需要任何
我是软件工程师,最近我构建了我的 Linux 机器,想探索更多系统管理员类型的任务。我已经探索并阅读了很多关于 ZFS 的内容,但我越来越困惑,因为每篇文章对它的描述都不一样。 Everything
我有 zfs 池: $ sudo zpool status lxd pool: lxd state: ONLINE scan: none requested config: NAME
我有一个基于 Actor 的项目,对于其中的一部分,我必须使用一些接收消息的 Actor ,然后一个 Actor 分别分配给每个请求,每个 Actor 负责执行其消息请求,所以我需要类似线程的东西我的
我已经使用 QEMU 模拟器成功地将 FreeBSD 安装到原始图像文件中。我已经使用 ZFS 文件系统 (ZFS POOL) 格式化了图像文件。 使用下面的命令我已经成功地挂载了准备好由 zpool
我正在使用 multiprocessor.Pool并行处理一些文件。该代码等待接收文件,然后使用 Pool.apply_async 将该文件发送给工作人员。 ,然后处理文件。 这段代码应该一直在运行,
我正在使用带有光滑的 Bonecp 数据源。并发现池包含关闭的连接所以我总是遇到这个异常 java.sql.SQLException: Connection is closed! at com
我有apartment gem的 Multi-Tenancy Rails应用程序,我可以使用apartment-sidekiq在每个工作程序中成功切换数据库租户。但是,sidekiq worker 正
ZFS 池可能由数据集(文件系统、快照等)或卷组成。 ZFS 卷就像 block 设备,但我不明白池和文件系统之间的区别。当我通过 zpool create pool1 sda sdb sdc 创建
我在 docker 容器上运行了 airflow。我正在使用 airflow 2.0.2 版。 我知道我实际上可以通过 UI 创建池。但我正在寻找一种通过 pools.json 文件在 docker
我在tomcat中有一个jdbc池,用于建立数据库连接。我在使用后没有显式关闭连接对象。我的“maxActive”参数设置为100。应用程序运行了一段时间,但随后失败进行数据库查询。它会等待无限时间来
阅读 PostgreSQL 文档 here我读了以下内容: As well, connections requested for users other than the default config
我在 docker 容器上运行了 airflow。我正在使用 airflow 2.0.2 版。 我知道我实际上可以通过 UI 创建池。但我正在寻找一种通过 pools.json 文件在 docker
我正在读取一个大的 URL 文件并向服务发出请求。该请求由返回 ListenableFuture 的客户端执行。现在我想保留一个 ListenableFuture 池,例如最多同时执行 N 个 Fut
我想使用队列来保存结果,因为我希望消费者(串行而不是并行)在工作人员产生结果时处理工作人员的结果。 现在,我想知道为什么以下程序挂起。 import multiprocessing as mp imp
我正在开发一个单页应用程序,目前正在构建一个 JQuery、ajax 函数,以便我的所有调用都能通过。 对于一个典型的页面,我可能有 3 个 ajax 调用。我的想法是,如果用户互联网出去将这些 aj
我有一个单位类及其一些子类(弓箭手、剑客等)。我怎样才能创建一个回收所有单元类型子类的池? 最佳答案 这是不可能的,因为池只能包含一种特定类型的对象。否则你可能会遇到这样的情况: Pool unitP
我是一名优秀的程序员,十分优秀!