43、Go语言基础 - 并发编程 - sync 包中的相关方法
hi, 我是温新
sync 包是 Go 语言标准库中的一个重要包,用于提供各种同步和并发原语,以协调多个 goroutines 之间的操作。这个包包括了多种工具,如互斥锁、条件变量、等待组等,用于编写并发安全的代码。
在之前的案例中,使用过Sleep 的方式将主 goroutine 阻塞到所有 goroutine 结束。更好的做法是使用 sync 包中的 WaitGroup 来实现。
Sync 包功能
关于 sync 包有如下功能:
- 1、互斥锁(Mutex)
- 
sync.Mutex是互斥锁的类型,用于保护共享资源,确保只有一个 goroutine 能够访问它。可以使用Lock和Unlock方法来锁定和解锁互斥锁。 - 例如,通过互斥锁,可以确保多个 goroutines 安全地访问共享数据。
 
 - 
 - 
2、读写锁(RWMutex)
- 
sync.RWMutex是读写锁的类型,允许多个 goroutines 并发读取共享资源,但在写操作时需要互斥锁。 - 通过读写锁,可以实现读多写少的并发模式,提高性能。
 
 - 
 - 
3、互斥量 Map(Map)
- 
sync.Map是 Go 1.9 引入的一种并发安全的 Map 数据结构,可以用于多个 goroutines 安全地访问 Map。 
 - 
 - 
4、条件变量(Cond)
- 
sync.Cond是条件变量的类型,用于在多个 goroutines 之间传递信号和通知。 - 通过条件变量,可以实现等待和通知的机制,例如等待某个条件变为真后再执行操作。
 
 - 
 - 
5、等待组(WaitGroup)
- 
sync.WaitGroup用于等待一组 goroutines 完成工作。主要用于等待一组 goroutines 完成后再继续执行其他操作。 - 通过等待组,可以确保所有的 goroutines都已经完成,再进行下一步操作。
 
 - 
 - 
6、原子操作(atomic)
- 
sync/atomic包提供了原子操作函数,用于在不使用互斥锁的情况下执行原子操作,如原子增加、交换等操作。 
 - 
 - 
7、Once
- 
sync.Once用于确保某个操作只会执行一次,无论多少个 goroutines 并发调用它。 
 - 
 - 
8、Pool
- 
sync.Pool用于池化资源,例如对象池,以减少对象分配和垃圾回收的开销。 
 - 
 
关于 sync 包,这篇文章只学习 WaitGroup、Once。
WaitGroup
WaitGroup 定义格式如下:
type WaitGroup struct {
    noCopy noCopy
    statel [12]byte
    sema uint32
}
WaitGroup 即等待一组 goroutine 结束。父 goroutine 调用 Add 方法来设置应等待 goroutine 的数量。每个被等待的 goroutine 在结束时应调用 Done 方法。与此同时,主 goroutine 可调用 Wait 方法阻塞至所有 goroutine 结束。
WaitGroup 中的方法如下:
func (wg *WaitGroup) Add(delta int)
Done 方法如下:
func (wg *WaitGroup) Done()
Wait 方法如下:
func (wg *WaitGroup) Wait()
Wait 方法阻塞 goroutine 直到 WaitGroup 计数器减为 0。
WaitGroup 案例如下:
package main
import (
    "fmt"
    "math/rand"
    "strings"
    "sync"
    "time"
)
func main() {
    // 创建一个等待组 wg 用于协调多个 goroutine
    var wg sync.WaitGroup
    // 输出 wg 的类型
    fmt.Printf("%T\n", wg)
    // 输出 wg 的初始状态信息
    fmt.Println(wg)
    // 向等待组中添加 3 个等待计数
    wg.Add(3)
    // 设置随机数种子
    rand.Seed(time.Now().UnixNano())
    // 启动 3 个子 goroutine 来执行 printNum 函数
    go printNum(&wg, 1)
    go printNum(&wg, 2)
    go printNum(&wg, 3)
    // 等待所有子 goroutine 完成
    wg.Wait()
    // 使用 defer 在 main 函数结束时输出 "main 结束"
    defer fmt.Println("main 结束")
}
// printNum 函数用于在子 goroutine 中输出数字
func printNum(wg *sync.WaitGroup, num int) {
    for i := 0; i <= 3; i++ {
        // 使用字符串重复制表符以产生缩进效果
        pre := strings.Repeat("\t", num-1)
        fmt.Printf("%s 第 %d 号子 goroutine  %d\n", pre, num, i)
        time.Sleep(time.Second) // 休眠 1 秒
    }
    // 告诉等待组 wg 子 goroutine 已完成
    wg.Done()
}
输出结果
sync.WaitGroup
{{} {{} {} 0} 0}
                 第 3 号子 goroutine  0
 第 1 号子 goroutine  0
         第 2 号子 goroutine  0
         第 2 号子 goroutine  1
                 第 3 号子 goroutine  1
 第 1 号子 goroutine  1
 第 1 号子 goroutine  2
                 第 3 号子 goroutine  2
         第 2 号子 goroutine  2
         第 2 号子 goroutine  3
                 第 3 号子 goroutine  3
 第 1 号子 goroutine  3
mian 结束
这段代码演示了如何使用 sync.WaitGroup 来等待多个 goroutines 完成任务,以确保它们都完成后再继续执行主函数。
Once
sync.Once 是 Go 语言标准库中的同步机制之一,用于确保某个操作只会执行一次,无论多少个 goroutines 并发调用它。
源码定义
type Once struct {
    m    Mutex
    done uint32
}
它包含一个互斥锁(m)和一个 done 字段,done 用于标记是否已经执行过操作。
方法
func (o *Once) Do(f func())
- 
o是sync.Once的指针。 - 
f是一个函数,它是要确保只执行一次的操作。 
工作原理
当第一次调用 Do 方法时,它会使用互斥锁 m 来确保只有一个 goroutine 能够进入执行 f 函数。同时,它会设置 done 字段,标记操作已经执行。
对于后续的调用,它会首先检查 done 字段,如果已经执行过操作,就不再执行 f 函数,直接返回。这样可以避免重复执行操作。
应用场景
- 
初始化:
sync.Once常用于执行初始化操作,例如数据库连接的初始化、全局配置的加载等。确保这些初始化操作只执行一次,避免重复初始化的开销和可能引发的问题。 - 
单例模式:
sync.Once也可以用于实现单例模式,保证某个对象只被创建一次。 - 
延迟初始化:在需要某个资源时,用 
sync.Once延迟初始化,以提高性能和资源利用率。 
代码案例
package main
import (
	"fmt"
	"sync"
)
func main() {
	var once sync.Once
	initialize := func() {
		// 这个函数只会执行一次
		fmt.Println("初始化操作")
	}
	for i := 0; i < 3; i++ {
		once.Do(initialize)
		fmt.Printf("第%d次调用\n", i+1)
	}
}
输出结果
初始化操作
第1次调用
第2次调用
第3次调用
在这个案例中,initialize 函数只会在第一次调用 once.Do() 时执行,后续调用不会再次执行。
sync.Map
sync.Map 是 Go 语言标准库中的一种并发安全的 Map 数据结构,它允许多个 goroutines 安全地对 Map 进行读写操作,而无需显式地加锁。
sync.Map 类型:
sync.Map 的类型定义如下:
goCopy codetype Map struct {
    mu sync.Mutex
    m  map[interface{}]interface{}
}
- 
mu是一个互斥锁,用于保护 Map 的读写操作。 - 
m是一个底层的 map,存储键值对。 
syan.Map 方法
- 
Load(key interface{}) (value interface{}, ok bool):用于读取指定键的值。 - 
Store(key, value interface{}):用于设置键值对。 - 
LoadOrStore(key, value interface{}) (actual interface{}, loaded bool):用于读取指定键的值,如果不存在则设置键值对。 - 
Delete(key interface{}):用于删除指定键的键值对。 - 
Range(f func(key, value interface{}) bool):用于遍历 Map 中的键值对。 
Map 案例
package main
import (
	"fmt"
	"sync"
)
func main() {
	// 创建一个新的 sync.Map
	var m sync.Map
	// 存储键值对
	m.Store("name", "王美丽")
	m.Store("age", 19)
	// 读取键值对
	name, _ := m.Load("name")
	age, _ := m.Load("age")
	fmt.Printf("名字: %s, 芳龄: %d\n", name, age)
	// 删除键值对
	m.Delete("age")
	// 使用 Range 遍历所有键值对
	m.Range(func(key, value interface{}) bool {
		fmt.Printf("键: %s, 值: %v\n", key, value)
		return true // 继续遍历
	})
}
结果如下
名字: 王美丽, 芳龄: 19
键: name, 值: 王美丽
这个示例演示了如何使用 sync.Map 进行并发安全的键值存储和读取,以及如何遍历其中的键值对。由于 sync.Map 内部使用了锁机制,因此可以安全地在多个 goroutines 中使用。
atomic 原子操作
Go的sync/atomic包提供了一组原子操作函数,用于在多个goroutines之间安全地执行原子操作,而不需要显式地使用锁。这些原子操作能够确保操作的不可分割性,避免竞态条件和数据竞争,是并发编程的重要工具。
常用原子操作函数:
- 
AddInt32(ptr *int32, delta int32) int32:原子地将*ptr和delta相加并返回新值。 - 
AddInt64(ptr *int64, delta int64) int64:原子地将*ptr和delta相加并返回新值。 - 
LoadInt32(ptr *int32) int32:原子地加载*ptr的值并返回。 - 
LoadInt64(ptr *int64) int64:原子地加载*ptr的值并返回。 - 
StoreInt32(ptr *int32, val int32):原子地将*ptr的值设置为val。 - 
StoreInt64(ptr *int64, val int64):原子地将*ptr的值设置为val。 - 
SwapInt32(ptr *int32, new int32) int32:原子地将*ptr的值设置为new并返回旧值。 - 
SwapInt64(ptr *int64, new int64) int64:原子地将*ptr的值设置为new并返回旧值。 
atomic 案例
package main
import (
	"fmt"
	"sync/atomic"
	"time"
)
func main() {
	var counter int32
	// 启动多个 goroutines 来增加计数器的值
	for i := 0; i < 5; i++ {
		go incrementCounter(&counter)
	}
	// 等待一段时间,以确保所有 goroutines 执行完毕
	time.Sleep(1 * time.Second)
	// 读取并输出计数器的最终值
	finalValue := atomic.LoadInt32(&counter)
	fmt.Printf("最终计数器值: %d\n", finalValue) // 结果是:50000
}
func incrementCounter(counter *int32) {
	for i := 0; i < 10000; i++ {
		atomic.AddInt32(counter, 1) // 使用原子增加操作
	}
}
这个示例演示了如何使用 sync/atomic 包中的原子操作函数,避免竞态条件和数据竞争,并确保多个 goroutines 安全地对计数器进行操作。原子操作能够保证操作的不可分割性,从而避免了并发问题。在这个示例中,多个 goroutines 并发地对计数器进行增加操作,但由于使用了原子操作,不会出现竞争条件,最终输出了正确的计数器值
代码解释:
1、创建一个 int32 类型的计数器 counter,这是一个需要原子操作的变量;
2、使用一个 for 循环启动了 5 个 goroutines,每个 goroutine 调用 incrementCounter 函数来增加计数器的值。这里模拟了多个 goroutines 同时对计数器进行操作;
3、使用 time.Sleep(1 * time.Second) 来等待一段时间,以确保所有的 goroutines 执行完毕。这是一个粗略的等待方式,实际应用中可以使用更精确的同步方法;
4、使用 atomic.LoadInt32(&counter) 来读取计数器的最终值,并将其输出。这是原子操作,保证在多个 goroutines 中进行安全的读取;
5、incrementCounter 函数中,我们使用 atomic.AddInt32(counter, 1) 来进行原子增加操作。这确保了在多个 goroutines 中对计数器的并发增加是安全的。
atomic 包
原子操作有一个 atomic 包,该包有相关操作方法,如下:
| 方法 | 解释 | 
|---|---|
| func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64) func LoadUint32(addr *uint32) (val uint32) func LoadUint64(addr *uint64) (val uint64) func LoadUintptr(addr *uintptr) (val uintptr) func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) | 读取操作 | 
| func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) | 写入操作 | 
| func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) | 修改操作 | 
| func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) | 交换操作 | 
| func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) | 比较并交换操作 |