Go 学习笔记——同步

  1. 1. 一、锁
    1. 1.1. 1、互斥锁
    2. 1.2. 2、读写锁
    3. 1.3. 3、示例
  2. 2. 二、条件变量(与OS中有所区别)
  3. 3. 三、原子操作
    1. 3.1. 1、增或减
    2. 3.2. 2、比较并交换
    3. 3.3. 3、载入
    4. 3.4. 4、存取
    5. 3.5. 5、交换
    6. 3.6. 6、原子值
      1. 3.6.1. 示例:一个并发安全的整数数组
    7. 3.7. 7、应用于实际
  4. 4. 四、只会执行一次
  5. 5. 五、WaitGroup
  6. 6. 六、临时对象池
  7. 7. 七、实战——Concurrent Map
    1. 7.1. 7.1、v1用读写锁实现

一、锁

1、互斥锁

由sync包中的Mutex结构体表示。

1
var mutex sync.Mutex

在锁定互斥锁后,紧接着使用defer语句保证该互斥锁的即使解锁

1
2
3
4
func write(){
mutex.Lock()
defer mutex.Unlock()
}

当对未锁定的互斥锁进行解锁时会引发一个运行时恐慌:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
defer func() {
fmt.Println("Try to recover the panic.")
if p := recover(); p != nil {
fmt.Printf("Recovered the panic(%#v).\n", p)
}
}()
var mutex sync.Mutex
fmt.Println("Lock the lock.")
mutex.Lock()
fmt.Println("The lock is locked.")
fmt.Println("Unlock the lock.")
mutex.Unlock()
fmt.Println("The lock is unlocked.")
fmt.Println("Unlock the lock again.")
mutex.Unlock()
}
//结果:
Lock the lock.
The lock is locked.
Unlock the lock.
The lock is unlocked.
Unlock the lock again.
fatal error: sync: unlock of unlocked mutex

最好把同一个互斥锁的锁定和解锁操作放在同一个层次的代码块中

2、读写锁

​ sync.RWMutex

写锁:

1
2
func (*RWMutex) Lock()
func (*RWMutex) Unlock()

读锁:

1
2
func (*RWMutex) RLock()
func (*RWMutex) RUnlock()

写解锁时会唤醒所有因欲进行读锁定而被阻塞的goroutine,而读解锁时只会在已无任何读锁定的情况下,试图唤醒一个因欲进行写锁定而被阻塞的goroutine

sync.RWMutex还拥有一个指针方法——RLocker,其会返回一个实现了sync.Locker接口类型的值,它包含两个方法lock,Unlock。*RWMutex和*Mutex都是它的实现类型。调用RLocker方法后,得到的结果值,就是读写锁本身,只不过该值的lock和unlock方法对应的是读锁定和都解锁。

3、示例

os.File的值代表文件系统中的某个文件或目录。

os.File为操作文件系统提供了底层的支持。但是该类型的方法并没有对并发操作的安全性做出保证,即这些方法并不是并发安全的。

要求:

  • 数据库长度相同,在读写进行前给定。
  • 若写操作实际欲写入长度超过设定长度,则将会截掉超出部分

对于写:

  • 每次写的数据应作为独立、完整的数据块,互不干扰。

对于读:

  • 每次读取一个独立、完整的数据块。
  • 每次读的数据块不能重复,且按顺序读取。

由此创建接口类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type DataFile interface {
// Read 会读取一个数据块。
Read() (rsn int64, d Data, err error)
// Write 会写入一个数据块。
Write(d Data) (wsn int64, err error)
// RSN 会获取最后读取的数据块的序列号。
RSN() int64
// WSN 会获取最后写入的数据块的序列号。
WSN() int64
// DataLen 会获取数据块的长度。
DataLen() uint32
// Close 会关闭数据文件。
Close() error
}

二、条件变量(与OS中有所区别)

Go标准代码库中的sync.Cond类型代表了条件变量。创建一个可用的条件变量:

1
func NewCond(1 Locker) *Cond

*Cond有3个方法:Wait、Signal和Broadcast。分别代表等待通知、单发通知和广播通知。

Wait会自动地对与该条件变量关联的那个锁进行解锁,并使Wait所在的goroutine阻塞,一旦收到通知,该goroutine就会被唤醒,并立即尝试锁定该锁。

Signal和Broadcast都是唤醒发送通知,区别是前者的目标只有一个,后者的目标则是所有。

  • 一定要在调用rcond的Wait方法之前锁定与之相关的读锁,否则会引发不可恢复的运行时恐慌。
  • 一定要在读取数据之后及时解锁与条件变量相关联的读锁,否则对读写锁的写所的那个操作将会阻塞相关goroutine

在调用Signal和Broadcast之前,无需锁定与之关联的锁。Write方法中的锁定与解锁操作和signal()之间并没有联系。

三、原子操作

由sync/atomic中的众多函数代表。可以通过这些函数对几种简单类型(int32、int64、uint32、uint64、uintptr和unsafe.Pointer。)的值执行原子操作(增或减、比较并交换、载入、存储和交换)。

1、增或减

  • 函数名称都以“Add”为前缀,后跟针对具体类型的名称。例如AddUint32。sync/atomic包中所有函数的命名都遵循此规则。

  • 被操作类型只能是数值类型即:int32、int64、uint32、uint64、uintptr

  • 第二个参数的类型与被操作值的类型总是相同的

  • 如果要减,第二个参数为负值即可。

例如想原子的把一个uint32类型的变量i32的值增大3:

1
newi32 := atomic.AddInt32(&i32, 3)
  • 对于uint类型的值,如uint32,可以利用补码来实现减法,设NN=-3

    1
    newi32 := atomic.AddInt32(&i32, ^uint(-NN-1)))

2、比较并交换

  • 即“Compare And Swap”,简称CAS。

  • 在atomic包中,这类操作以”CompareAndSwap”为前缀的函数代表

  • 以针对int32类型的函数为例:

    1
    2
    3
    4
    fuc CompareAndSwapInt32(addr *int32,old,new int32)(swapped bool)
    //先判断addr指向的被操作数与old是否相等,相等进行下一步,否则忽略。
    //用参数new代表的新值代替旧值。
    //swap表示是否进行了值的替换操作
  • CAS总是假设被操作值未曾改变(即与旧值相等),一旦确定假设正确,就立刻进行值替换

  • 优势:可以在不创建互斥量和不形成临界区的情况下,完成并发安全的值替换,大大减少同步对程序性能的损耗

  • 劣势:在被操作值被频繁变更的情况下,CAS操作并不那么容易成功,有时不得不利用for循环进行多次尝试:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    var value int32
    func addValue(delta int32){
    for{
    v :=value
    if atomic.CompareAndSwap(value, v ,(v+delta)){
    break
    }
    }
    }

    虽然不会阻塞goroutine,但可能使流程的执行暂时停滞。

  • 若想并发安全地更新一些类型,应该总是优先选择CAS操作。

3、载入

  • 原子的读取某个值,函数的名称都以“Load”为前缀,以针对int32类型的函数为例:

    1
    2
    3
    4
    5
    6
    7
    8
    func addValue(delta int32){
    for{
    v :=atomic.LoadInt(&value)
    if atomic.CompareAndSwap(value, v ,(v+delta)){
    break
    }
    }
    }

4、存取

  • 以”Store”为前缀
  • 原子的写操作
  • 并不关心旧值是什么,所以总会成功
  • StoreInt32接受两个参数:*int32类型的指向被操作值的指针值。int32类型的新值。

5、交换

  • 以“Swap”为前缀
  • 不关心旧值,而是直接设置新值
  • 返回旧值
  • SwapInt32接受两个参数:*int32类型的指向被操作值的指针值。int32类型的新值。

6、原子值

  • atomic.Value是一个结构体类型。用于存储需要原子读写的值。

  • 可接受的被操作值类型不限

  • 简单声明即可得到一个可用的原子值实例:

    1
    var atomicVal atomic.Value
  • 两个公开的指针方法——Load和Store,前者返回一个interface{}类型的结果且不接受任何参数。后者接受一个interface{}类型的参数而没有任何结果。

  • 在未曾通过Store方法向原子值实例存储值之前,它的Load方法总会返回nil

  • 其Store方法的两个现在:

    • 传参值不能为nil
    • 一旦原子值存储了某一个类型的值,之后存储的值必须是该类型的
  • atomic.Value类型的变量一旦声明,其值就不应该被复制到它处

    会造成复制:作为源值赋给别的变量、作为参数传入函数、作为结果值从函数返回、作为元素值通过通道传递

    这虽然不会造成编译错误,但Go标准工具go vet会报告此类不正确(或含安全隐患)的用法。不过,其指针类型却不存在这个问题

示例:一个并发安全的整数数组

7、应用于实际

对于前面的示例中:

1
2
3
4
5
6
// 读取并更新读偏移量。
var offset int64
df.rmutex.Lock()
offset = df.roffset
df.roffset += int64(df.dataLen)
df.rmutex.Unlock()

可以通过原子操作改为:

1
2
3
4
5
6
7
8
// 读取并更新读偏移量
var offset int64
for {
offset = df.roffset
if atomic.CompareAndSwapInt64(&df.roffset, offset, (offset + int64(df.dataLen))) {
break//若CAS操作不成功说明在从读取到更新的期间,有其他并发操作对该值进行了更改。
}
}

在32位计算机上写入一个64位的整数,也会存在在并发安全方面的隐患。因此还应将

1
offset = df.roffset

改为:

1
offset = atomic.LoadInt64(&df.roffset)

四、只会执行一次

sync.Once和它的Do方法

  • 与互斥锁、读写锁一样,开箱即用:

    1
    2
    var one sync.Once
    once.Do(func(){fmt.println("Once!")})
  • Do接受一个无参数、无结果的函数值作为其参数,该方法一旦被调用,就会去调用作为参数的那个函数。

  • 对同一个sync.Once类型值的Do方法的有效调用次数永远会是1.即无论调用该方法多少次,无论传给它的参数值是否相同,都仅有第一次调用是有效的。无论怎样,只有第一次调用该方法时传递给它的那个函数会执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    package main

    import (
    "fmt"
    "math/rand"
    "sync"
    )

    func main() {
    var count int
    var once sync.Once
    max := rand.Intn(100)
    for i := 0; i < max; i++ {
    once.Do(func() {
    count++
    })
    }
    fmt.Printf("Count: %d.\n", count)
    }

五、WaitGroup

  • WaitGroup的值是并发安全的,也是开箱即用的:var wg sync.WaitGroup
  • 有三个指针方法:Add、Done和Wait
  • 是一个结构体类型:有一个代表计数的字节数组类型的字段,该字段用4字节表示给定计数,另用4字节表示等待计数。当一个WaitGroup类型的变量被声明之后,其中这两个计数都会是0.
  • 通过Add增大或减少其中的给定计数:wg.Add(3)或wg.Add(-3)
  • 一定不能让给定计数变为负值,这回引发运行恐慌
  • 还可以用Done,使给定计数减一:wg.Done()
  • 当调用wait时,会检查给定计数,若为0会立即返回,若大于0,阻塞调用的goroutine,同时给定计数加1,直到给定计数变为0时,才会唤醒因此而阻塞的所有goroutine,同时清零等待计数。
  • 一般用于协调多个goroutine的运行。

使用方法和规则

  • 对Add的第一次调用,发生在调用Done和Wait之前
  • 在一个WaitGroup类型值的生命周期内,其中的给定计数总是由起初的0变为某几个正整数,然后再归为0
  • 若在一个计数周期内调用wait方法,就会阻塞所在的goroutine直至周期结束的那一刻。
  • WaitGroup类型值可复用

六、临时对象池

  • sync.Pool可以看作存放临时值的容器(自动伸缩、高效、并发安全)。

  • 在用复合字面量初始化一个临时对象池的时候,可以为它唯一的公开字段New赋值。该字段类型是func() interface{},即一个函数类型(可以称为对象值生成函数)。赋给该字段的函数会被临时对象用来创建对象值。该函数一般仅在池中无可用对象值的时候才被调用

  • 有两个公开的指针方法:

    • Get:从池中获取一个interface{}类型的值
    • Put:把一个interface{}类型的值放入池中。
  • 若一个临时对象池的Put未被调用过,且New也未曾被赋予一个非Nil的函数值,Get返回的结果一定是nil

  • Get返回的不一定是存在池中的值,但若是,一定要在池中删除对应值。

  • 两个特性:

    • 可以把由其中对象值产生的存储压力进行分摊。会专门为每一个与操作它的goroutine相关联的P建立本地池。在Get被调用时,一般会先尝试从与本地P对应的那个本地私有池和本地共享池中获取一个对象值。若获取失败会尝试从其他P的本地共享池中偷一个对象值并直接返回给对方。若依然未果,会用当前临时对象池的对象值生成函数产生对象值(永远不会放到池中,而是直接返回给调用方)。Put会把参数值放到本地P的本地池中。每个相关P的本地共享池中的所有对象值,都是在当前临时对象池的范围内共享的。即随时会被偷走
    • 对垃圾回收友好。垃圾回收的执行一般会使临时对象池中的对象值全部被移除。即使我们不去出对象池中的某个对象值,它也不会永远待在临时对象池中,其生命周期取决于垃圾回收任务下一次执行时间。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    func main() {
    // 禁用GC,并保证在main函数执行结束前恢复GC。
    defer debug.SetGCPercent(debug.SetGCPercent(-1))
    var count int32
    newFunc := func() interface{} {
    return atomic.AddInt32(&count, 1)
    }
    pool := sync.Pool{New: newFunc}

    // New 字段值的作用。
    v1 := pool.Get()
    fmt.Printf("Value 1: %v\n", v1)

    // 临时对象池的存取。
    pool.Put(10)
    pool.Put(11)
    pool.Put(12)
    v2 := pool.Get()
    fmt.Printf("Value 2: %v\n", v2)

    // 垃圾回收对临时对象池的影响。
    debug.SetGCPercent(100)

    runtime.GC()
    // 在新版本(起码 1.15 及以后)的 Go 当中,sync.Pool 里的临时对象需要两次 GC 才会被真正清除掉。
    // 只一次 GC 的话只会让其中的临时对象被“打上记号”。
    // 更具体的说,只做一次 GC 只会使得 sync.Pool 里的临时对象被移动到池中的“备用区域”(详见 victim 字段)。
    // 在我们调用 sync.Pool 的 Get 方法时,如果 sync.Pool 的“本地区域”(详见 local 字段)当中没有可用的临时对象,
    // 那么 sync.Pool 会试图从这个“备用区域”中获取临时对象。
    // 如果“备用区域”也没有可用的临时对象,sync.Pool 才会去调用 New 函数。
    // 所以,这里的例子需要再添加一行对 runtime.GC() 函数的调用,才能使它的结果与最初的相同,并起到准确示范的作用。
    runtime.GC()

    v3 := pool.Get()
    fmt.Printf("Value 3: %v\n", v3)
    pool.New = nil
    v4 := pool.Get()
    fmt.Printf("Value 4: %v\n", v4)
    }
    //运行结果:
    Value 1: 1
    Value 2: 10
    Value 3: 2
    Value 4: <nil>

七、实战——Concurrent Map

Go提供的字典类型并不是并发安全的,因此需要使用一些同步的方法对它进行扩展

7.1、v1用读写锁实现

接口类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ConcurrentMap 代表并发安全的字典的接口。
type ConcurrentMap interface {
// Concurrency 会返回并发量。
Concurrency() int
// Put 会推送一个键-元素对。
// 注意!参数element的值不能为nil。
// 第一个返回值表示是否新增了键-元素对。
// 若键已存在,新元素值会替换旧的元素值。
Put(key string, element interface{}) (bool, error)
// Get 会获取与指定键关联的那个元素。
// 若返回nil,则说明指定的键不存在。
Get(key string) interface{}
// Delete 会删除指定的键-元素对。
// 若结果值为true则说明键已存在且已删除,否则说明键不存在。
Delete(key string) bool
// Len 会返回当前字典中键-元素对的数量。
Len() uint64
}

实现类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// myConcurrentMap 代表ConcurrentMap接口的实现类型。
type myConcurrentMap struct {
concurrency int
//代表并发量,也代表segments长度
segments []Segment
//一个Segment值代表一个散列段。
//每个散列段都提供对其包含的键-元素对的读写操作。
//这里的读写需要由互斥锁保证其并发安全性。
//有多少个散列段,就有多少个互斥锁分别加以保护
//这样的加锁方式称为“分段锁”
//segments长度在初始化时就要确定且不能更改
total uint64
//当前字典中键值对实际数量
}

创建实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// NewConcurrentMap 会创建一个ConcurrentMap类型的实例。
// 参数pairRedistributor可以为nil。
func NewConcurrentMap(
concurrency int,
pairRedistributor PairRedistributor) (ConcurrentMap, error) {
if concurrency <= 0 {
return nil, newIllegalParameterError("concurrency is too small")
}
if concurrency > MAX_CONCURRENCY {
return nil, newIllegalParameterError("concurrency is too large")
}
cmap := &myConcurrentMap{}
cmap.concurrency = concurrency
cmap.segments = make([]Segment, concurrency)
for i := 0; i < concurrency; i++ {
cmap.segments[i] =
newSegment(DEFAULT_BUCKET_NUMBER, pairRedistributor)
}
return cmap, nil
}

Put:

1
2
3
4
5
6
7
8
9
10
11
12
func (cmap *myConcurrentMap) Put(key string, element interface{}) (bool, error) {
p, err := newPair(key, element)
if err != nil {
return false, err
}
s := cmap.findSegment(p.Hash())
ok, err := s.Put(p)
if ok {
atomic.AddUint64(&cmap.total, 1)
}
return ok, err
}

Get:

1
2
3
4
5
6
7
8
9
func (cmap *myConcurrentMap) Get(key string) interface{} {
keyHash := hash(key)
s := cmap.findSegment(keyHash)
pair := s.GetWithHash(key, keyHash)
if pair == nil {
return nil
}
return pair.Element()
}