《Go语言编程》阅读笔记(2)
并发编程
并发基础
- 每个进程运行时都有自己的调用栈和堆(有一个完整的上下文),OS在调度进程时会保存被调度进程的上下文,该进程获取时间片后再恢复其上下文
- 多个进程是可以并发的,主流实现方式如下:
- 多进程:OS层面的并发模式,所有进程由内核管理,因此开销大
- 多线程:系统层面的并发模式
- 基于回调的非阻塞/异步IO:多线程模式容易耗尽服务器的内存和CPU,该模式对流程做了分割,通过事件驱动的方式异步使用IO,尽可能地少用线程
- 协程:本质上是用户态线程,不需要OS做抢占式调度,实现中寄存于线程
协程coroutine
- “轻量级线程”
goroutine
- 举例:让函数
func add(x, y int)
并发执行,则go add(1, 1)
- 被调用的函数返回时,该goroutine自动结束——如果函数有返回值,则返回值被丢弃
- 当main函数返回时,程序退出,并且不会等待其他goroutine结束——需要实现goroutine之间的通信
并发通信
- 工程上两种常见的并发通信模型:
- 数据共享:多个单元分别保持对同一个数据(内存数据、磁盘文件等)的引用,C通常使用此方法
- 消息:认为每个并发单元是独立的个体并有独立变量,不同单元间变量不共享,不同单元之间通过消息通信(作为输入和输出)
- go的消息通信机制称为channel
channel
用channel在多个goroutine之间传递消息,通过channel传递对象的过程和函数的参数传递类似
跨进程通信建议使用分布式系统的方法,例如使用Socket协议或HTTP协议
一个channel只能传递一种类型的值,需要声明时指定
- 定义10个channel的数组,每个channel给10个goroutine
- goroutine内部通过
ch <- 1
写入数据,该channel被读取前该操作被阻塞 - 所有goroutine启动后,通过
<-ch
从10个channel依次读取数据,对应channel写入数据前该操作同样被阻塞
channel语法:
一般的声明形式:
var channame chan ElementType
,ElementType指定该channel传递的元素类型,例如var ch chan int
和var m map[string] chan bool
定义channel:
ch := make(chan int, 1)
——定义有一个缓冲区的chan,此时向ch发送第一个数据时主协程不会退出数据写入/发送给channel:
ch <- value
,数据写入会阻塞程序,直到其他goroutine从channel读取:value := <-ch
若channel中没有写过数据,从channel中读取数据也会阻塞程序
select:类似switch,但每个case必须是I/O操作(第一个case从chan1中读取数据并忽略)
- select语句不使用default分支时,处于阻塞状态直到其中一个channel的收/发操作准备就绪。如果同时有多个channel的收/发操作准备就绪则随机选择其中一个
- select语句使用default分支时,处于非阻塞状态,从所有准备就绪(或者channel关闭或者缓冲区有值)的channel中随机选择其中一个。如果处于阻塞状态,则执行default分支
缓冲机制:创建要给带缓冲的channel:
ch := make(chan int, 1024)
,将缓冲区大小作为第二个参数传入,在缓冲区装满前都不会阻塞1
2
3
4
5
6
7
8//遍历缓冲区,这里range只有一个返回值
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 5
for value := range ch {
fmt.Println(value)
}
超时机制:可能存在通讯锁死的问题;go没有直接的超时处理机制,但可以利用select:此时如果ch为空,则程序会在1s后从timeout中读取一个数据,继续执行(go关键字表明,该函数会在另一个协程里执行)
channel的传递(这里没有看懂)
channel在定义后可以通过channel传递,类似于pipe(handle函数的输入为一系列PipeData,实现流式处理数据的目的)
单向channel:只能用于发送或接收数据——将一个channel送入函数时,可指定其为单向channel,从而限制函数对该channel的操作
关闭channel:
close(ch)
即可- 判断一个channel是否已经关闭:
x, ok := <-ch
,检查ok是否为false,为false表明已经关闭
多核并行
当前Go的编译器不能发现和利用多核的优势,虽然可以创建多个goroutine,但实际上所有goroutine运行在同一个CPU核心上
通过设置环境变量GOMAXPROCS控制使用CPU核心数目:在启动goroutine前,
runtime.GOMAXPROCS(16)
来设置CPU核心数目通过
runtime.Gosched()
控制goroutine出让时间片给其他goroutine
同步
同步锁
- sync包提供两种锁:
- Mutex:当一个goroutine获得Mutex后,其他goroutine只能等待其释放Mutex
- RWMutex:单写多读,读锁占用时会组织写但不阻止读
- RLock():读锁;Lock():写锁
- sync包提供两种锁:
全局唯一性操作:全局角度只运行一次的代码
once类型
sync包还提供atomic子包,提供对一些基础数据类型的原子操作函数,例如
func CompareAndSwapUint64
,比较和交换两个uint64数据,此时无需专门添加Lock操作
举例
等价二叉查找树(判定给定的两个树中序遍历结果是否相同):
tree.New(k)
用于构造一个随机结构的已排序的二叉查找树,其中值为k,2k,3k,...,10k
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47package main
import "golang.org/x/tour/tree"
import "fmt"
// Walk 步进 tree t 将所有的值从 tree 发送到 channel ch。
func Walk(t *tree.Tree, ch chan int) {
if t.Left != nil {
Walk(t.Left, ch)
}
ch <- t.Value
if t.Right != nil {
Walk(t.Right, ch)
}
}
// Same 检测树 t1 和 t2 是否有相同的中序遍历结果
func Same(t1, t2 *tree.Tree) bool {
ch1 := make(chan int,10)
ch2 := make(chan int 10)
go Walk(t1, ch1)
go Walk(t2, ch2)
for i := 0; i < 10; i++ {
n1, n2 := <-ch1, <-ch2
if n1 != n2 {
return false
}
}
return true
}
func main() {
same := Same(tree.New(1), tree.New(1))
if same {
fmt.Println("PASS : they are same")
} else {
fmt.Println("FAIL : they should be same.")
}
same = Same(tree.New(1), tree.New(2))
if !same {
fmt.Println("PASS : they are not same")
} else {
fmt.Println("FAIL : they should be not same")
}
}
// 正确运行结果都应该是Pass通过
sync.Mutex
互斥锁(Lock()和Unlock()),保证每次只有一个goroutine访问一个共享的变量,从而避免冲突,并且利用defer保证互斥锁一定被解锁1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39package main
import (
"fmt"
"sync"
"time"
)
// SafeCounter 的并发使用是安全的。
type SafeCounter struct {
v map[string]int
mux sync.Mutex
}
// Inc 增加给定 key 的计数器的值。
func (c *SafeCounter) Inc(key string) {
c.mux.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
c.v[key]++
c.mux.Unlock()
}
// Value 返回给定 key 的计数器的当前值。
func (c *SafeCounter) Value(key string) int {
c.mux.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
defer c.mux.Unlock()
return c.v[key]
}
func main() {
c := SafeCounter{v: make(map[string]int)}
for i := 0; i < 1000; i++ {
go c.Inc("somekey")
}
time.Sleep(time.Second)
fmt.Println(c.Value("somekey")) // 1000
}