- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章golang 并发安全Map以及分段锁的实现方法由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
涉及概念 。
分断锁 。
1
2
3
4
|
type SimpleCache struct {
mu sync.RWMutex
items map[interface{}]*simpleItem
}
|
在日常开发中, 上述这种数据结构肯定不少见,因为golang的原生map是非并发安全的,所以为了保证map的并发安全,最简单的方式就是给map加锁.
之前使用过两个本地内存缓存的开源库, gcache, cache2go,其中存储缓存对象的结构都是这样,对于轻量级的缓存库,为了设计简洁(包含清理过期对象等 ) 再加上当需要缓存大量数据时有redis,memcache等明星项目解决。 但是如果抛开这些因素遇到真正数量巨大的数据量时,直接对一个map加锁,当map中的值越来越多,访问map的请求越来越多,大家都竞争这一把锁显得并发访问控制变重。 在go1.9引入sync.Map 之前,比较流行的做法就是使用分段锁,顾名思义就是将锁分段,将锁的粒度变小,将存储的对象分散到各个分片中,每个分片由一把锁控制,这样使得当需要对在A分片上的数据进行读写时不会影响B分片的读写.
分段锁的实现 。
1
2
3
4
5
6
7
8
|
// Map 分片
type ConcurrentMap []*ConcurrentMapShared
// 每一个Map 是一个加锁的并发安全Map
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // 各个分片Map各自的锁
}
|
主流的分段锁,即通过hash取模的方式找到当前访问的key处于哪一个分片之上,再对该分片进行加锁之后再读写。分片定位时,常用有BKDR, FNV32等hash算法得到key的hash值.
1
2
3
4
5
6
7
8
9
10
|
func New() ConcurrentMap {
// SHARD_COUNT 默认32个分片
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared{
items: make(map[string]interface{}),
}
}
return m
}
|
在初始化好分片后, 对分片上的数据进行读写时就需要用hash取模进行分段定位来确认即将要读写的分片.
获取段定位 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}
// FNV hash
func fnv32(key string) uint32 {
hash := uint32(2166136261)
const prime32 = uint32(16777619)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}
|
之后对于map的GET SET 就简单顺利成章的完成 。
Set And Get 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (m ConcurrentMap) Set(key string, value interface{}) {
shard := m.GetShard(key) // 段定位找到分片
shard.Lock() // 分片上锁
shard.items[key] = value // 分片操作
shard.Unlock() // 分片解锁
}
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
shard := m.GetShard(key)
shard.RLock()
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
|
由此一个分段锁Map就实现了, 但是比起普通的Map, 常用到的方法比如获取所有key, 获取所有Val 操作是要比原生Map复杂的,因为要遍历每一个分片的每一个数据, 好在golang的并发特性使得解决这类问题变得非常简单 。
Keys 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
// 统计当前分段map中item的个数
func (m ConcurrentMap) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
shard := m[i]
shard.RLock()
count += len(shard.items)
shard.RUnlock()
}
return count
}
// 获取所有的key
func (m ConcurrentMap) Keys() []string {
count := m.Count()
ch := make(chan string, count)
// 每一个分片启动一个协程 遍历key
go func() {
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
defer wg.Done()
shard.RLock()
// 每个分片中的key遍历后都写入统计用的channel
for key := range shard.items {
ch <- key
}
shard.RUnlock()
}(shard)
}
wg.Wait()
close(ch)
}()
keys := make([]string, count)
// 统计各个协程并发读取Map分片的key
for k := range ch {
keys = append(keys, k)
}
return keys
}
|
这里写了一个benchMark来对该分段锁Map和原生的Map加锁方式进行压测, 场景为将一万个不重复的键值对同时以100万次写和100万次读,分别进行5次压测, 如下压测代码 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func BenchmarkMapShared(b *testing.B) {
num := 10000
testCase := genNoRepetTestCase(num) // 10000个不重复的键值对
m := New()
for _, v := range testCase {
m.Set(v.Key, v.Val)
}
b.ResetTimer()
for i := 0; i < 5; i++ {
b.Run(strconv.Itoa(i), func(b *testing.B) {
b.N = 1000000
wg := sync.WaitGroup{}
wg.Add(b.N * 2)
for i := 0; i < b.N; i++ {
e := testCase[rand.Intn(num)]
go func(key string, val interface{}) {
m.Set(key, val)
wg.Done()
}(e.Key, e.Val)
go func(key string) {
_, _ = m.Get(key)
wg.Done()
}(e.Key)
}
wg.Wait()
})
}
}
|
原生Map加锁压测结果 。
分段锁压测结果 。
可以看出在将锁的粒度细化后再面对大量需要控制并发安全的访问时,分段锁Map的耗时比原生Map加锁要快3倍有余 。
Sync.Map 。
go1.9之后加入了支持并发安全的Map sync.Map, sync.Map 通过一份只使用原子操作的数据和一份冗余了只读数据的加锁数据实现一定程度上的读写分离,使得大多数读操作和更新操作是原子操作,写入新数据才加锁的方式来提升性能。以下是 sync.Map源码剖析, 结构体中的注释都会在具体实现代码中提示相呼应 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
type Map struct {
// 保护dirty的锁
mu Mutex
// 只读数据(修改采用原子操作)
read atomic.Value
// 包含只读中所有数据(冗余),写入新数据时也在dirty中操作
dirty map[interface{}]*entry
// 当原子操作访问只读read时找不到数据时会去dirty中寻找,此时misses+1,dirty及作为存储新写入的数据,又冗余了只读结构中的数据,所以当misses > dirty 的长度时, 会将dirty升级为read,同时将老的dirty置nil
misses int
}
// Map struct 中的 read 就是readOnly 的指针
type readOnly struct {
// 基础Map
m map[interface{}]*entry
// 用于表示当前dirty中是否有read中不存在的数据, 在写入数据时, 如果发现dirty中没有新数据且dirty为nil时,会将read中未被删除的数据拷贝一份冗余到dirty中, 过程与Map struct中的 misses相呼应
amended bool
}
// 数据项
type entry struct {
p unsafe.Pointer
}
// 用于标记数据项已被删除(主要保证数据冗余时的并发安全)
// 上述Map结构中说到有一个将read数据拷贝冗余至dirty的过程, 因为删除数据项是将*entry置nil, 为了避免冗余过程中因并发问题导致*entry改变而影响到拷贝后的dirty正确性,所以sync.Map使用expunged来标记entry是否被删除
var expunged = unsafe.Pointer(new(interface{}))
|
在下面sync.Map具体实现中将会看到很多“双检查”代码,因为通过原子操作获取的值可能在进行其他非原子操作过程中已改变,所以再非原子操作后需要使用之前原子操作获取的值需要再次进行原子操作获取.
compareAndSwap 交换并比较, 用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时导致数据不一致问题.
sync.Map Write 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
func (m *Map) Store(key, value interface{}) {
// 先不上锁,而是从只读数据中按key读取, 如果已存在以compareAndSwap操作进行覆盖(update)
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
// 双检查获取read
read, _ = m.read.Load().(readOnly)
// 如果data在read中,更新entry
if e, ok := read.m[key]; ok {
// 如果原子操作读到的数据是被标记删除的, 则视为新数据写入dirty
if e.unexpungeLocked() {
m.dirty[key] = e
}
// 原子操作写新数据
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 原子操作写新数据
e.storeLocked(&value)
} else {
// 新数据
// 当dirty中没有新数据时,将read中数据冗余到dirty
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (e *entry) tryStore(i *interface{}) bool {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
for {
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
}
}
// 在dirty中没有比read多出的新数据时触发冗余
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// 检查entry是否被删除, 被删除的数据不冗余
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
// 将被删除(置nil)的数据以cas原子操作标记为expunged(防止因并发情况下其他操作导致冗余进dirty的数据不正确)
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
|
sync.Map Read 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 只读数据中没有,并且dirty有比read多的数据,加锁在dirty中找
if !ok && read.amended {
m.mu.Lock()
// 双检查, 因为上锁之前的语句是非原子性的
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 只读中没有读取到的次数+1
e, ok = m.dirty[key]
// 检查是否达到触发dirty升级read的条件
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
// atomic.Load 但被标记为删除的会返回nil
return e.load()
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
|
sync.Map DELETE 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 只读中不存在需要到dirty中去删除
if !ok && read.amended {
m.mu.Lock()
// 双检查, 因为上锁之前的语句是非原子性的
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
|
同样以刚刚压测原生加锁Map和分段锁的方式来压测sync.Map 。
压测平均下来sync.Map和分段锁差别不大,但是比起分段锁, sync.Map则将锁的粒度更加的细小到对数据的状态上,使得大多数据可以无锁化操作, 同时比分段锁拥有更好的拓展性,因为分段锁使用前总是要定一个分片数量, 在做扩容或者缩小时很麻烦, 但要达到sync.Map这种性能既好又能动态扩容的程度,代码就相对复杂很多.
还有注意在使用sync.Map时切忌不要将其拷贝, go源码中有对sync.Map注释到” A Map must not be copied after first use.”因为当sync.Map被拷贝之后, Map类型的dirty还是那个map 但是read 和 锁却不是之前的read和锁(都不在一个世界你拿什么保护我), 所以必然导致并发不安全(为了写博我把sync.Map代码复制出来一份把私有成员改成可外部访问的打印指针) 。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.
原文链接:https://segmentfault.com/a/1190000018448064 。
最后此篇关于golang 并发安全Map以及分段锁的实现方法的文章就讲到这里了,如果你想了解更多关于golang 并发安全Map以及分段锁的实现方法的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在尝试在多线程环境中实现某种累积逻辑;我想知道没有 lock 和 synchronized 关键字是否有更好/更快的方法来做到这一点?以下是我当前的代码: public class Concurr
我需要帮助构建一个实现信号量的监视器,简单的 C 示例就可以。 这是为了证明可以在任何可以使用信号量的地方使用监视器。 最佳答案 如果您说允许使用互斥锁/condvars,请检查: #include
我已经构建了一些返回部分产品目录的 ajax,并且我正在尝试将 xml 输出到文档中,到目前为止,这是我所拥有的: $("#catalog").append("Item NamePriceDe
很抱歉,如果我的问题之前已经被问过,或者它太明显了,但我真的需要澄清这一点。感谢您的帮助。 在多用户界面中,如果来自不同用户的相同事务同时到达服务器,会发生什么? 我有下一张表: create tab
这可能是一个愚蠢的问题,但是这个程序的输出(它的方式)可以为零吗? public class Test2{ int a = 0; AtomicInteger b = new Atomi
假设我本地主机上的一个网站处理每个请求大约需要 3 秒。这很好,正如预期的那样(因为它在幕后进行了一些奇特的网络)。 但是,如果我在选项卡(在 firefox 中)中打开相同的 url,然后同时重新加
我对 MongoDB 的读锁定有点困惑。单个集合可以支持多少个并发读取操作? 最佳答案 如 tk 给出的链接中所写:http://www.mongodb.org/pages/viewpage.acti
如果有四个并发的 CUDA 应用程序在一个 GPU 中竞争资源会发生什么这样他们就可以将工作卸载到图形卡上了? Cuda Programming Guide 3.1 提到那里 某些方法是异步的: 内核
👊上次的百度面试遇到了关于spark的并发数的问题,今天我们就来将这些问题都一并解决一下,图画的的有点丑,还行大家见谅,百度实习的问题我放在了下面的链接👇: 链接: 2022百度大数据开发工程师实
我对 Groovy 线程有疑问。 我的任务是以某种方式翻译给定目录中的每个文件 并将生成的输出放在其他目录中的文件中。 我编写了以下代码,该代码有效: static def translateDir(
Java中的同步和锁定有什么区别? 最佳答案 synchronized是语言关键字;锁是对象。 当一个方法或代码块被标记为同步时,您是说该方法或代码块必须先获得某个锁对象(可以在同步的语法中指定)才能
我需要创建一个能够同时处理来自客户端的多个请求的并发 RPC 服务器。 使用 rpcgen linux编译器(基于sun RPC),不支持-A为并发服务器创建 stub 的选项。 (-A 选项在 so
System.out.println("Enter the number of what you would like to do"); System.out.println("1 = Manuall
我正在将我的应用程序移植到 iOS 8.0 并注意到 UIAlertView 已被弃用。 所以我改变了使用 UIAlertController 的方法。这在大多数情况下都有效。 除了,当我的应用程序打
我正在逐行同时读取两个文本文件。 我特别想做的是当lineCount在每个线程上都是相同的我想看看扫描仪当前正在读取的字符串。 我环顾四周寻找可以实现的某些模式,例如 Compare and Swap
我正在阅读 Java Concurrency in Practice .在章节中断政策部分 取消和关闭 它提到 A task should not assume anything about the
我正在尝试学习线程,互斥等的基础知识。遵循here的文档和示例。在下面的代码中,我得到预期的输出。问题: 想确认我是否有任何陷阱?我们如何改善下面的代码? 我的线程在哪一行尝试获取互斥锁或正在等待互斥
并发是指两个任务在不同的线程上并行运行。但是,异步方法并行运行,但在同一个线程上。这是如何实现的?另外,并行性怎么样? 这三个概念有什么区别? 最佳答案 并发和并行实际上与您正确推测的原理相同,两者都
以此ConcurrentDouble类定义为例: public class ConcurrentDouble { public double num = 0; public void subt
在得知并发确实增加了许多人的吞吐量后,我一直计划在项目中使用并发。现在我在多线程或并发方面还没有做太多工作,因此决定在实际项目中使用它之前学习并进行简单的概念验证。 以下是我尝试过的两个示例: 1.
我是一名优秀的程序员,十分优秀!